2 # -*- coding: utf-8 -*-
3 # Simple HTTP web server for testing HTTP Authentication (see bug 1539)
4 # from our crappy-but-does-the-job department
5 # Thomas Perl <thp.io/about>; 2012-01-20
16 USERNAME
= 'user@example.com' # Username used for HTTP Authentication
17 PASSWORD
= 'secret' # Password used for HTTP Authentication
19 HOST
, PORT
, RPORT
= 'localhost', 8000, 8001 # Hostname and port for the HTTP server
21 # When the script contents change, the feed's episodes each get a new GUID
22 GUID
= hashlib
.sha1(open(__file__
, mode
='rb').read()).hexdigest()
24 URL
= 'http://%(HOST)s:%(PORT)s' % locals()
26 FEEDNAME
= sys
.argv
[0] # The title of the RSS feed
27 REDIRECT
= 'redirect.rss' # The path for a redirection
28 REDIRECT_TO_BAD_HOST
= 'redirect_bad' # The path for a redirection
29 FEEDFILE
= 'feed.rss' # The "filename" of the feed on the server
30 EPISODES
= 'episode' # Base name for the episode files
31 TIMEOUT
= 'timeout' # The path to never return
32 EPISODES_EXT
= '.mp3' # Extension for the episode files
33 EPISODES_MIME
= 'audio/mpeg' # Mime type for the episode files
34 EP_COUNT
= 7 # Number of episodes in the feed
35 SIZE
= 500000 # Size (in bytes) of the episode downloads)
38 def mkpubdates(items
):
39 """Generate fake pubDates (one each day, recently)"""
40 current
= datetime
.datetime
.now() - datetime
.timedelta(days
=items
+ 3)
41 for i
in range(items
):
43 current
+= datetime
.timedelta(days
=1)
46 def mkrss(items
=EP_COUNT
):
47 """Generate a dumm RSS feed with a given number of items"""
50 <title>Episode %(INDEX)s</title>
51 <guid>tag:test.gpodder.org,2012:%(GUID)s,%(URL)s,%(INDEX)s</guid>
52 <pubDate>%(PUBDATE)s</pubDate>
54 url="%(URL)s/%(EPISODES)s%(INDEX)s%(EPISODES_EXT)s"
55 type="%(EPISODES_MIME)s"
58 """ % dict(list(locals().items()) + list(globals().items()))
59 for INDEX
, PUBDATE
in enumerate(mkpubdates(items
)))
62 <title>Missing Episode</title>
63 <guid>tag:test.gpodder.org,2012:missing</guid>
64 <pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
66 url="%(URL)s/not_there%(EPISODES_EXT)s"
67 type="%(EPISODES_MIME)s"
69 </item>""" % dict(list(locals().items()) + list(globals().items()))
72 <title>Server Timeout Episode</title>
73 <guid>tag:test.gpodder.org,2012:timeout</guid>
74 <pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
76 url="%(URL)s/%(TIMEOUT)s"
77 type="%(EPISODES_MIME)s"
79 </item>""" % dict(list(locals().items()) + list(globals().items()))
82 <title>Bad Host Episode</title>
83 <guid>tag:test.gpodder.org,2012:timeout</guid>
84 <pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
86 url="%(URL)s/%(REDIRECT_TO_BAD_HOST)s"
87 type="%(EPISODES_MIME)s"
89 </item>""" % dict(list(locals().items()) + list(globals().items()))
92 <title>Space in url Episode</title>
93 <guid>tag:test.gpodder.org,2012:timeout</guid>
94 <pubDate>Sun, 25 Nov 2018 17:28:03 +0000</pubDate>
96 url="%(URL)s/%(EPISODES)s with space%(EPISODES_EXT)s"
97 type="%(EPISODES_MIME)s"
99 </item>""" % dict(list(locals().items()) + list(globals().items()))
103 <channel><title>%(FEEDNAME)s</title><link>%(URL)s</link>
107 """ % dict(list(locals().items()) + list(globals().items()))
110 def mkdata(size
=SIZE
):
111 """Generate dummy data of a given size (in bytes)"""
112 return bytes([32 + (i
% (127 - 32)) for i
in range(size
)])
115 class AuthRequestHandler(http
.server
.BaseHTTPRequestHandler
):
116 FEEDFILE_PATH
= '/%s' % FEEDFILE
117 EPISODES_PATH
= '/%s' % EPISODES
118 REDIRECT_PATH
= '/%s' % REDIRECT
119 REDIRECT_TO_BAD_HOST_PATH
= '/%s' % REDIRECT_TO_BAD_HOST
120 TIMEOUT_PATH
= '/%s' % TIMEOUT
127 auth_header
= self
.headers
.get('authorization', '')
128 m
= re
.match(r
'^Basic (.*)$', auth_header
)
130 auth_data
= base64
.b64decode(m
.group(1)).decode().split(':', 1)
131 if len(auth_data
) == 2:
132 username
, password
= auth_data
133 print('Got username:', username
)
134 print('Got password:', password
)
135 if (username
, password
) == (USERNAME
, PASSWORD
):
136 print('Valid credentials provided.')
139 if self
.path
== self
.FEEDFILE_PATH
:
140 print('Feed request.')
142 elif self
.path
.startswith(self
.EPISODES_PATH
):
143 print('Episode request.')
145 elif self
.path
== self
.REDIRECT_PATH
:
146 print('Redirect request.')
147 self
.send_response(302)
148 self
.send_header('Location', '%s/%s' % (URL
, FEEDFILE
))
151 elif self
.path
.startswith(self
.REDIRECT_TO_BAD_HOST_PATH
):
152 print('Redirect request => bad host.')
153 self
.send_response(302)
154 self
.send_header('Location', '//notthere.gpodder.io/%s' % (FEEDFILE
))
157 elif self
.path
== self
.TIMEOUT_PATH
:
158 # will need to restart the server or wait 80s before next request
163 print('Not authorized - sending WWW-Authenticate header.')
164 self
.send_response(401)
165 self
.send_header('WWW-Authenticate',
166 'Basic realm="%s"' % sys
.argv
[0])
169 if not is_feed
and not is_episode
:
170 print('Not there episode - sending 404.')
171 self
.send_response(404)
175 self
.send_response(200)
176 self
.send_header('Content-type',
177 'application/xml' if is_feed
else 'audio/mpeg')
179 self
.wfile
.write(mkrss().encode('utf-8') if is_feed
else mkdata())
184 httpd
.handle_request()
187 if __name__
== '__main__':
188 httpd
= http
.server
.HTTPServer((HOST
, PORT
), AuthRequestHandler
)
190 Feed URL: %(URL)s/%(FEEDFILE)s
191 Redirect URL: http://%(HOST)s:%(RPORT)d/%(REDIRECT)s
192 Timeout URL: %(URL)s/%(TIMEOUT)s
193 Username: %(USERNAME)s
194 Password: %(PASSWORD)s
196 httpdr
= http
.server
.HTTPServer((HOST
, RPORT
), AuthRequestHandler
)
197 t1
= threading
.Thread(name
='http', target
=run
, args
=(httpd
,), daemon
=True)
199 t2
= threading
.Thread(name
='http redirect', target
=run
, args
=(httpdr
,), daemon
=True)
204 except KeyboardInterrupt: