Fix GitLab pipeline status badge link.
[mailman.git] / src / mailman / rest / validator.py
blobb7817b38c1e4d01ae0d7d83b460c7d78fe6a06ce
1 # Copyright (C) 2010-2023 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
8 # any later version.
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <https://www.gnu.org/licenses/>.
18 """REST web form validation."""
20 import re
22 from lazr.config import as_boolean
23 from mailman.interfaces.address import IEmailValidator
24 from mailman.interfaces.errors import MailmanError
25 from mailman.interfaces.languages import ILanguageManager
26 from mailman.rest.helpers import get_request_params
27 from public import public
28 from urllib.parse import urlparse
29 from zope.component import getUtility
32 COMMASPACE = ', '
35 @public
36 class RESTError(MailmanError):
37 """Base class for REST API errors."""
40 @public
41 class UnknownPATCHRequestError(RESTError):
42 """A PATCH request contained an unknown attribute."""
44 def __init__(self, attribute):
45 self.attribute = attribute
48 @public
49 class ReadOnlyPATCHRequestError(RESTError):
50 """A PATCH request contained a read-only attribute."""
52 def __init__(self, attribute):
53 self.attribute = attribute
56 @public
57 class enum_validator:
58 """Convert an enum value name into an enum value."""
60 def __init__(self, enum_class, *, allow_blank=False):
61 self._enum_class = enum_class
62 self._allow_blank = allow_blank
64 def __call__(self, enum_value):
65 # This will raise a KeyError if the enum value is unknown. The
66 # Validator API requires turning this into a ValueError.
67 if not enum_value and self._allow_blank:
68 return None
69 try:
70 return self._enum_class[enum_value]
71 except KeyError:
72 # Retain the error message.
73 err_msg = 'Accepted Values are: {}'.format(self._accepted_values)
74 raise ValueError(err_msg)
76 @property
77 def _accepted_values(self):
78 """Joined comma separated self._enum_class values"""
79 return ', '.join(item._name_ for item in self._enum_class)
82 @public
83 def subscriber_validator(api):
84 """Convert an email-or-(int|hex) to an email-or-UUID."""
85 def _inner(subscriber):
86 try:
87 return api.to_uuid(subscriber)
88 except ValueError:
89 # It must be an email address.
90 if getUtility(IEmailValidator).is_valid(subscriber):
91 return subscriber
92 raise ValueError
93 return _inner
96 @public
97 def language_validator(code):
98 """Convert a language code to a Language object."""
99 return getUtility(ILanguageManager)[code]
102 @public
103 def list_of_strings_validator(values):
104 """Turn a list of things, or a single thing, into a list of unicodes."""
105 # There is no good way to pass around an empty list through HTTP API, so,
106 # we consider an empty string as an empty list, which can easily be passed
107 # around. This is a contract between Core and Postorius. This also fixes a
108 # bug where an empty string ('') would be interpreted as a valid value ['']
109 # to create a singleton list, instead of empty list, which in later stages
110 # would create other problems.
111 if values == '':
112 return []
113 if not isinstance(values, (list, tuple)):
114 values = [values]
115 for value in values:
116 if not isinstance(value, str):
117 raise ValueError('Expected str, got {!r}'.format(value))
118 return values
121 @public
122 def list_of_emails_validator(values):
123 """Turn a list of things, or a single thing, into a list of emails."""
124 if not isinstance(values, (list, tuple)):
125 if getUtility(IEmailValidator).is_valid(values):
126 return [values]
127 raise ValueError('Bad email address format: {}'.format(values))
128 for value in values:
129 if not getUtility(IEmailValidator).is_valid(value):
130 raise ValueError('Expected email address, got {!r}'.format(value))
131 return values
134 @public
135 def list_of_emails_or_regexp_validator(values):
136 if values == '':
137 return []
138 if not isinstance(values, (list, tuple)):
139 values = [values]
140 for value in values:
141 email_or_regexp_validator(value)
142 return values
145 @public
146 def list_of_emails_or_regexp_or_atlist_validator(values):
147 if values == '':
148 return []
149 if not isinstance(values, (list, tuple)):
150 values = [values]
151 for value in values:
152 email_or_regexp_or_atlist_validator(value)
153 return values
156 @public
157 def integer_ge_zero_validator(value):
158 """Validate that the value is a non-negative integer."""
159 value = int(value)
160 if value < 0:
161 raise ValueError('Expected a non-negative integer: {}'.format(value))
162 return value
165 @public
166 def regexp_validator(value): # pragma: missed
167 """Validate that the value is a valid regexp."""
168 # This code is covered as proven by the fact that the tests
169 # test_add_bad_regexp and test_patch_bad_regexp in
170 # mailman/rest/tests/test_header_matches.py fail with AssertionError:
171 # HTTPError not raised if the code is bypassed, but coverage says it's
172 # not covered so work around it for now.
173 try:
174 re.compile(value)
175 except re.error:
176 raise ValueError('Expected a valid regexp, got {}'.format(value))
177 return value
180 @public
181 def email_or_regexp_validator(value):
182 """ Email or regular expression validator
184 Validate that the value is not null and is a valid regular expression or
185 email.
187 if not value:
188 raise ValueError(
189 'Expected a valid email address or regular expression, got empty')
190 valid = True
191 # A string starts with ^ will be regarded as regex.
192 if value.startswith('^'):
193 try:
194 regexp_validator(value)
195 except ValueError:
196 valid = False
197 else:
198 valid = getUtility(IEmailValidator).is_valid(value)
200 if valid:
201 return value
202 else:
203 raise ValueError(
204 'Expected a valid email address or regular expression,'
205 ' got {}'.format(value))
208 @public
209 def email_or_regexp_or_atlist_validator(value):
210 """ Email or regular expression or @list validator
212 Validate that the value is not null and is a valid regular expression or
213 email or @fqdn_listname.
215 if not value:
216 raise ValueError(
217 'Expected a valid email address, regular expression or '
218 '@fqdn_listname, got empty')
219 valid = True
220 # A string starts with ^ will be regarded as regex.
221 if value.startswith('^'):
222 try:
223 regexp_validator(value)
224 except ValueError:
225 valid = False
226 # A string starting with @ is a list posting address.
227 elif value.startswith('@'):
228 # Just validate the posting address as an email.
229 valid = getUtility(IEmailValidator).is_valid(value[1:])
230 else:
231 valid = getUtility(IEmailValidator).is_valid(value)
233 if valid:
234 return value
235 else:
236 raise ValueError(
237 'Expected a valid email address, regular expression or '
238 '@fqdn_listname, got {}'.format(value))
241 @public
242 def email_validator(value):
243 """Validate the value is a valid email."""
244 if not getUtility(IEmailValidator).is_valid(value):
245 raise ValueError(
246 'Expected a valid email address, got {}'.format(value))
247 return value
250 @public
251 def url_validator(value):
252 """Validate the value is a valid web url."""
253 parsed = urlparse(value)
254 if parsed.scheme and parsed.netloc and parsed.path:
255 return value
256 raise ValueError("Invalid URL: {value}.".format(value=value))
259 @public
260 class Validator:
261 """A validator of parameter input."""
263 def __init__(self, **kws):
264 if '_optional' in kws:
265 self._optional = set(kws.pop('_optional'))
266 else:
267 self._optional = set()
268 self._converters = kws.copy()
270 def __call__(self, request):
271 values = {}
272 extras = set()
273 cannot_convert = set()
274 form_data = {}
275 # All keys which show up only once in the form data get a scalar value
276 # in the pre-converted dictionary. All keys which show up more than
277 # once get a list value.
278 missing = object()
279 # Parse the items from request depending on the content type.
280 items = get_request_params(request)
282 for key, new_value in items.items():
283 old_value = form_data.get(key, missing)
284 if old_value is missing:
285 form_data[key] = new_value
286 elif isinstance(old_value, list):
287 old_value.append(new_value)
288 else:
289 form_data[key] = [old_value, new_value]
290 # Now do all the conversions.
291 for key, value in form_data.items():
292 try:
293 if (self._converters[key] is as_boolean and
294 isinstance(value, bool)):
295 values[key] = value
296 else:
297 values[key] = self._converters[key](value)
298 except KeyError:
299 extras.add(key)
300 except (TypeError, ValueError) as e:
301 cannot_convert.add((key, str(e)))
302 # Make sure there are no unexpected values.
303 if len(extras) != 0:
304 extras = COMMASPACE.join(sorted(extras))
305 raise ValueError('Unexpected parameters: {}'.format(extras))
306 # raise BadRequestError(
307 # description='Unexpected parameters: {}'.format(extras))
308 # Make sure everything could be converted.
309 if len(cannot_convert) != 0:
310 invalid_msg = []
311 for param in sorted(cannot_convert):
312 invalid_msg.append(
313 'Invalid Parameter "{0}": {1}.'.format(*param))
314 raise ValueError(' '.join(invalid_msg))
315 # raise InvalidParamError(param_name=bad, msg=invalid_msg)
316 # Make sure nothing's missing.
317 value_keys = set(values)
318 required_keys = set(self._converters) - self._optional
319 if value_keys & required_keys != required_keys:
320 missing = COMMASPACE.join(sorted(required_keys - value_keys))
321 raise ValueError('Missing Parameter: {}'.format(missing))
322 # raise MissingParamError(param_name=missing)
323 return values
325 def update(self, obj, request):
326 """Update the object with the values in the request.
328 This first validates and converts the attributes in the request, then
329 updates the given object with the newly converted values.
331 :param obj: The object to update.
332 :type obj: object
333 :param request: The HTTP request.
334 :raises ValueError: if conversion failed for some attribute, including
335 if the API version mismatches.
337 for key, value in self.__call__(request).items():
338 self._converters[key].put(obj, key, value)
341 @public
342 class PatchValidator(Validator):
343 """Create a special validator for PATCH requests.
345 PATCH is different than PUT because with the latter, you're changing the
346 entire resource, so all expected attributes must exist. With the former,
347 you're only changing a subset of the attributes, so you only validate the
348 ones that exist in the request.
351 def __init__(self, request, converters):
352 """Create a validator for the PATCH request.
354 :param request: The request object, which must have a .PATCH
355 attribute.
356 :param converters: A mapping of attribute names to the converter for
357 that attribute's type. Generally, this will be a GetterSetter
358 instance, but it might be something more specific for custom data
359 types (e.g. non-basic types like unicodes).
360 :raises UnknownPATCHRequestError: if the request contains an unknown
361 attribute, i.e. one that is not in the `attributes` mapping.
362 :raises ReadOnlyPATCHRequest: if the requests contains an attribute
363 that is defined as read-only.
365 validationators = {}
366 # Parse the items from request depending on the content type.
367 items = get_request_params(request)
368 for attribute in items:
369 if attribute not in converters:
370 raise UnknownPATCHRequestError(attribute)
371 if converters[attribute].decoder is None:
372 raise ReadOnlyPATCHRequestError(attribute)
373 validationators[attribute] = converters[attribute]
374 super().__init__(**validationators)