1 # ##### BEGIN GPL LICENSE BLOCK #####
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software Foundation,
15 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 # ##### END GPL LICENSE BLOCK #####
22 from http
.server
import BaseHTTPRequestHandler
, HTTPServer
23 from urllib
.parse
import parse_qs
, quote
as urlquote
, urlparse
28 class SimpleOAuthAuthenticator(object):
29 def __init__(self
, server_url
, client_id
, ports
):
30 self
.server_url
= server_url
31 self
.client_id
= client_id
34 def _get_tokens(self
, authorization_code
=None, refresh_token
=None, grant_type
="authorization_code"):
36 "grant_type": grant_type
,
37 "state": "random_state_string",
38 "client_id": self
.client_id
,
39 "scopes": "read write",
41 if hasattr(self
, 'redirect_uri'):
42 data
["redirect_uri"] = self
.redirect_uri
43 if authorization_code
:
44 data
['code'] = authorization_code
46 data
['refresh_token'] = refresh_token
48 response
= requests
.post(
49 '%s/o/token/' % self
.server_url
,
52 if response
.status_code
!= 200:
53 print("error retrieving refresh tokens %s" % response
.status_code
)
54 print(response
.content
)
55 return None, None, None
57 response_json
= json
.loads(response
.content
)
58 refresh_token
= response_json
['refresh_token']
59 access_token
= response_json
['access_token']
60 return access_token
, refresh_token
, response_json
62 def get_new_token(self
, register
=True, redirect_url
=None):
63 class HTTPServerHandler(BaseHTTPRequestHandler
):
64 html_template
= '<html>%(head)s<h1>%(message)s</h1></html>'
66 self
.send_response(200)
67 self
.send_header('Content-type', 'text/html')
69 if 'code' in self
.path
:
70 self
.auth_code
= self
.path
.split('=')[1]
71 # Display to the user that they no longer need the browser window
74 '<head><meta http-equiv="refresh" content="0;url=%(redirect_url)s"></head>'
75 '<script> window.location.href="%(redirect_url)s"; </script>' % {'redirect_url': redirect_url
}
79 self
.wfile
.write(bytes(self
.html_template
% {'head': redirect_string
, 'message': 'You may now close this window.'}, 'utf-8'))
80 qs
= parse_qs(urlparse(self
.path
).query
)
81 self
.server
.authorization_code
= qs
['code'][0]
83 self
.wfile
.write(bytes(self
.html_template
% {'head': '', 'message': 'Authorization failed.'}, 'utf-8'))
85 for port
in self
.ports
:
87 httpServer
= HTTPServer(('localhost', port
), HTTPServerHandler
)
91 self
.redirect_uri
= "http://localhost:%s/consumer/exchange/" % port
93 "/o/authorize?client_id=%s&state=random_state_string&response_type=code&"
94 "redirect_uri=%s" % (self
.client_id
, self
.redirect_uri
)
97 authorize_url
= "%s/accounts/register/?next=%s" % (self
.server_url
, urlquote(authorize_url
))
99 authorize_url
= "%s%s" % (self
.server_url
, authorize_url
)
100 webbrowser
.open_new(authorize_url
)
102 httpServer
.handle_request()
103 authorization_code
= httpServer
.authorization_code
104 return self
._get
_tokens
(authorization_code
=authorization_code
)
106 def get_refreshed_token(self
, refresh_token
):
107 return self
._get
_tokens
(refresh_token
=refresh_token
, grant_type
="refresh_token")