Merge remote-tracking branch 'refs/remotes/breton/new_gst10'
[larjonas-mediagoblin.git] / mediagoblin / decorators.py
blobb5ec0ce81e4eebdbf964d29f4bcd078d9935d2bf
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from functools import wraps
19 from werkzeug.exceptions import Forbidden, NotFound
20 from oauthlib.oauth1 import ResourceEndpoint
22 from six.moves.urllib.parse import urljoin
24 from mediagoblin import mg_globals as mgg
25 from mediagoblin import messages
26 from mediagoblin.db.models import MediaEntry, User, MediaComment, AccessToken
27 from mediagoblin.tools.response import (
28 redirect, render_404,
29 render_user_banned, json_response)
30 from mediagoblin.tools.translate import pass_to_ugettext as _
32 from mediagoblin.oauth.tools.request import decode_authorization_header
33 from mediagoblin.oauth.oauth import GMGRequestValidator
36 def user_not_banned(controller):
37 """
38 Requires that the user has not been banned. Otherwise redirects to the page
39 explaining why they have been banned
40 """
41 @wraps(controller)
42 def wrapper(request, *args, **kwargs):
43 if request.user:
44 if request.user.is_banned():
45 return render_user_banned(request)
46 return controller(request, *args, **kwargs)
48 return wrapper
51 def require_active_login(controller):
52 """
53 Require an active login from the user. If the user is banned, redirects to
54 the "You are Banned" page.
55 """
56 @wraps(controller)
57 @user_not_banned
58 def new_controller_func(request, *args, **kwargs):
59 if request.user and \
60 not request.user.has_privilege(u'active'):
61 return redirect(
62 request, 'mediagoblin.user_pages.user_home',
63 user=request.user.username)
64 elif not request.user or not request.user.has_privilege(u'active'):
65 next_url = urljoin(
66 request.urlgen('mediagoblin.auth.login',
67 qualified=True),
68 request.url)
70 return redirect(request, 'mediagoblin.auth.login',
71 next=next_url)
73 return controller(request, *args, **kwargs)
75 return new_controller_func
78 def user_has_privilege(privilege_name, allow_admin=True):
79 """
80 Requires that a user have a particular privilege in order to access a page.
81 In order to require that a user have multiple privileges, use this
82 decorator twice on the same view. This decorator also makes sure that the
83 user is not banned, or else it redirects them to the "You are Banned" page.
85 :param privilege_name A unicode object that is that represents
86 the privilege object. This object is
87 the name of the privilege, as assigned
88 in the Privilege.privilege_name column
90 :param allow_admin If this is true then if the user is an admin
91 it will allow the user even if the user doesn't
92 have the privilage given in privilage_name.
93 """
95 def user_has_privilege_decorator(controller):
96 @wraps(controller)
97 @require_active_login
98 def wrapper(request, *args, **kwargs):
99 if not request.user.has_privilege(privilege_name, allow_admin):
100 raise Forbidden()
102 return controller(request, *args, **kwargs)
104 return wrapper
105 return user_has_privilege_decorator
108 def active_user_from_url(controller):
109 """Retrieve User() from <user> URL pattern and pass in as url_user=...
111 Returns a 404 if no such active user has been found"""
112 @wraps(controller)
113 def wrapper(request, *args, **kwargs):
114 user = User.query.filter_by(username=request.matchdict['user']).first()
115 if user is None:
116 return render_404(request)
118 return controller(request, *args, url_user=user, **kwargs)
120 return wrapper
123 def user_may_delete_media(controller):
125 Require user ownership of the MediaEntry to delete.
127 @wraps(controller)
128 def wrapper(request, *args, **kwargs):
129 uploader_id = kwargs['media'].uploader
130 if not (request.user.has_privilege(u'admin') or
131 request.user.id == uploader_id):
132 raise Forbidden()
134 return controller(request, *args, **kwargs)
136 return wrapper
139 def user_may_alter_collection(controller):
141 Require user ownership of the Collection to modify.
143 @wraps(controller)
144 def wrapper(request, *args, **kwargs):
145 creator_id = request.db.User.query.filter_by(
146 username=request.matchdict['user']).first().id
147 if not (request.user.has_privilege(u'admin') or
148 request.user.id == creator_id):
149 raise Forbidden()
151 return controller(request, *args, **kwargs)
153 return wrapper
156 def uses_pagination(controller):
158 Check request GET 'page' key for wrong values
160 @wraps(controller)
161 def wrapper(request, *args, **kwargs):
162 try:
163 page = int(request.GET.get('page', 1))
164 if page < 0:
165 return render_404(request)
166 except ValueError:
167 return render_404(request)
169 return controller(request, page=page, *args, **kwargs)
171 return wrapper
174 def get_user_media_entry(controller):
176 Pass in a MediaEntry based off of a url component
178 @wraps(controller)
179 def wrapper(request, *args, **kwargs):
180 user = User.query.filter_by(username=request.matchdict['user']).first()
181 if not user:
182 raise NotFound()
184 media = None
186 # might not be a slug, might be an id, but whatever
187 media_slug = request.matchdict['media']
189 # if it starts with id: it actually isn't a slug, it's an id.
190 if media_slug.startswith(u'id:'):
191 try:
192 media = MediaEntry.query.filter_by(
193 id=int(media_slug[3:]),
194 state=u'processed',
195 uploader=user.id).first()
196 except ValueError:
197 raise NotFound()
198 else:
199 # no magical id: stuff? It's a slug!
200 media = MediaEntry.query.filter_by(
201 slug=media_slug,
202 state=u'processed',
203 uploader=user.id).first()
205 if not media:
206 # Didn't find anything? Okay, 404.
207 raise NotFound()
209 return controller(request, media=media, *args, **kwargs)
211 return wrapper
214 def get_user_collection(controller):
216 Pass in a Collection based off of a url component
218 @wraps(controller)
219 def wrapper(request, *args, **kwargs):
220 user = request.db.User.query.filter_by(
221 username=request.matchdict['user']).first()
223 if not user:
224 return render_404(request)
226 collection = request.db.Collection.query.filter_by(
227 slug=request.matchdict['collection'],
228 creator=user.id).first()
230 # Still no collection? Okay, 404.
231 if not collection:
232 return render_404(request)
234 return controller(request, collection=collection, *args, **kwargs)
236 return wrapper
239 def get_user_collection_item(controller):
241 Pass in a CollectionItem based off of a url component
243 @wraps(controller)
244 def wrapper(request, *args, **kwargs):
245 user = request.db.User.query.filter_by(
246 username=request.matchdict['user']).first()
248 if not user:
249 return render_404(request)
251 collection_item = request.db.CollectionItem.query.filter_by(
252 id=request.matchdict['collection_item']).first()
254 # Still no collection item? Okay, 404.
255 if not collection_item:
256 return render_404(request)
258 return controller(request, collection_item=collection_item, *args, **kwargs)
260 return wrapper
263 def get_media_entry_by_id(controller):
265 Pass in a MediaEntry based off of a url component
267 @wraps(controller)
268 def wrapper(request, *args, **kwargs):
269 media = MediaEntry.query.filter_by(
270 id=request.matchdict['media_id'],
271 state=u'processed').first()
272 # Still no media? Okay, 404.
273 if not media:
274 return render_404(request)
276 given_username = request.matchdict.get('user')
277 if given_username and (given_username != media.get_uploader.username):
278 return render_404(request)
280 return controller(request, media=media, *args, **kwargs)
282 return wrapper
285 def get_workbench(func):
286 """Decorator, passing in a workbench as kwarg which is cleaned up afterwards"""
288 @wraps(func)
289 def new_func(*args, **kwargs):
290 with mgg.workbench_manager.create() as workbench:
291 return func(*args, workbench=workbench, **kwargs)
293 return new_func
296 def allow_registration(controller):
297 """ Decorator for if registration is enabled"""
298 @wraps(controller)
299 def wrapper(request, *args, **kwargs):
300 if not mgg.app_config["allow_registration"]:
301 messages.add_message(
302 request,
303 messages.WARNING,
304 _('Sorry, registration is disabled on this instance.'))
305 return redirect(request, "index")
307 return controller(request, *args, **kwargs)
309 return wrapper
311 def allow_reporting(controller):
312 """ Decorator for if reporting is enabled"""
313 @wraps(controller)
314 def wrapper(request, *args, **kwargs):
315 if not mgg.app_config["allow_reporting"]:
316 messages.add_message(
317 request,
318 messages.WARNING,
319 _('Sorry, reporting is disabled on this instance.'))
320 return redirect(request, 'index')
322 return controller(request, *args, **kwargs)
324 return wrapper
326 def get_optional_media_comment_by_id(controller):
328 Pass in a MediaComment based off of a url component. Because of this decor-
329 -ator's use in filing Media or Comment Reports, it has two valid outcomes.
331 :returns The view function being wrapped with kwarg `comment` set to
332 the MediaComment who's id is in the URL. If there is a
333 comment id in the URL and if it is valid.
334 :returns The view function being wrapped with kwarg `comment` set to
335 None. If there is no comment id in the URL.
336 :returns A 404 Error page, if there is a comment if in the URL and it
337 is invalid.
339 @wraps(controller)
340 def wrapper(request, *args, **kwargs):
341 if 'comment' in request.matchdict:
342 comment = MediaComment.query.filter_by(
343 id=request.matchdict['comment']).first()
345 if comment is None:
346 return render_404(request)
348 return controller(request, comment=comment, *args, **kwargs)
349 else:
350 return controller(request, comment=None, *args, **kwargs)
351 return wrapper
354 def auth_enabled(controller):
355 """Decorator for if an auth plugin is enabled"""
356 @wraps(controller)
357 def wrapper(request, *args, **kwargs):
358 if not mgg.app.auth:
359 messages.add_message(
360 request,
361 messages.WARNING,
362 _('Sorry, authentication is disabled on this instance.'))
363 return redirect(request, 'index')
365 return controller(request, *args, **kwargs)
367 return wrapper
369 def require_admin_or_moderator_login(controller):
371 Require a login from an administrator or a moderator.
373 @wraps(controller)
374 def new_controller_func(request, *args, **kwargs):
375 if request.user and \
376 not (request.user.has_privilege(u'admin')
377 or request.user.has_privilege(u'moderator')):
379 raise Forbidden()
380 elif not request.user:
381 next_url = urljoin(
382 request.urlgen('mediagoblin.auth.login',
383 qualified=True),
384 request.url)
386 return redirect(request, 'mediagoblin.auth.login',
387 next=next_url)
389 return controller(request, *args, **kwargs)
391 return new_controller_func
395 def oauth_required(controller):
396 """ Used to wrap API endpoints where oauth is required """
397 @wraps(controller)
398 def wrapper(request, *args, **kwargs):
399 data = request.headers
400 authorization = decode_authorization_header(data)
402 if authorization == dict():
403 error = "Missing required parameter."
404 return json_response({"error": error}, status=400)
407 request_validator = GMGRequestValidator()
408 resource_endpoint = ResourceEndpoint(request_validator)
409 valid, r = resource_endpoint.validate_protected_resource_request(
410 uri=request.base_url,
411 http_method=request.method,
412 body=request.data,
413 headers=dict(request.headers),
416 if not valid:
417 error = "Invalid oauth prarameter."
418 return json_response({"error": error}, status=400)
420 # Fill user if not already
421 token = authorization[u"oauth_token"]
422 request.access_token = AccessToken.query.filter_by(token=token).first()
423 if request.access_token is not None and request.user is None:
424 user_id = request.access_token.user
425 request.user = User.query.filter_by(id=user_id).first()
427 return controller(request, *args, **kwargs)
429 return wrapper