Quick binary expression handling for “test_token_buffering“
[vadmium-streams.git] / hostdl.py
blob1c1782490a34d0f7d1cf53cb8d5f03e11b55f525
1 #! /usr/bin/env python3
3 from sys import stderr
4 from io import TextIOWrapper
5 from net import PersistentConnectionHandler, request_cached, header_list
6 import urllib.request
7 from xml.etree.ElementTree import TreeBuilder
8 from contextlib import ExitStack
9 from html.parser import HTMLParser
10 from streams import DelegateWriter
11 from shutil import copyfileobj
12 from gzip import GzipFile
13 from urllib.parse import urlencode, urljoin
14 from time import sleep
15 from datetime import date
16 from ssl import SSLContext, PROTOCOL_TLSv1_2
17 from tkinter import Tk, Label, PhotoImage, Entry, StringVar
18 import javascript
19 from io import StringIO
21 def main(url, *, tls1_2=False):
22 cookies = urllib.request.HTTPCookieProcessor()
23 kw = dict(timeout=10)
24 if tls1_2:
25 kw.update(context=SSLContext(PROTOCOL_TLSv1_2))
26 with PersistentConnectionHandler(**kw) as conn:
27 handlers = ( conn, cookies )
28 urlopen = urllib.request.build_opener(*handlers).open
29 response = request_html(url, urlopen=urlopen)
30 for probe in probes:
31 result = probe(response)
32 if result:
33 break
34 else:
35 raise SystemExit('No site matched')
36 [props, dl] = result
37 for prop in props:
38 print("{}: {}".format(*prop), file=stderr)
39 print(end='', flush=True, file=stderr)
40 print(dl(response, url, handlers))
42 probes = list()
44 @probes.append
45 def uptobox_probe(response):
46 heading = next(response.iter("h1"), None)
47 if heading is None:
48 return None
49 heading = "".join(heading.itertext())
50 heading = heading.rsplit(" (", 1)
51 if len(heading) < 2:
52 return None
53 [filename, size] = heading
54 size = size[:-len(" MB)")]
55 return (( ('Filename', filename), ('Size', mib_size(size)) ), uptobox_dl)
57 def uptobox_dl(response, url, handlers):
58 while True:
59 dl = response.find(".//td[a]")[0]
60 if dl.tag == "a":
61 break
63 input = response.find(".//input[@name='waitingToken']")
64 if not input.get("value"):
65 time = response.find(".//*[@data-remaining-time]")
66 wait(int(time.get("data-remaining-time")))
67 field = (input.get("name"), input.get("value"))
68 field = urlencode((field,))
69 type = ("Content-Type", "application/x-www-form-urlencoded")
70 response = request_html(url, field, method="POST",
71 headers=(type,),
72 data=field.encode("ascii"),
73 urlopen=urllib.request.build_opener(*handlers).open)
74 return dl.get("href")
76 @probes.append
77 def filefactory_probe(response):
78 info = response.findtext(".//*[@id='file_info']")
79 if info is None:
80 return None
81 [size, uploaded] = info.split(' MB uploaded ', 1)
82 [month, uploaded] = uploaded.split(' ', 1)
83 [day, year] = uploaded.split(', ')
84 MONTHS = ('January', 'February', 'March', 'April', 'May', 'June',
85 'July', 'August', 'September', 'October', 'November', 'December')
86 return ((
87 ('Filename', response.findtext(".//*[@class='file-name']")),
88 ('Size', mib_size(size)),
89 ('Uploaded', date(int(year), 1 + MONTHS.index(month), int(day))),
90 ), filefactory_dl)
92 def filefactory_dl(response, url, handlers):
93 dl = response.find(".//*[@id='file-download-free-action-start']")
94 return dl.get('data-href')
96 @probes.append
97 def protected_probe(response):
98 if response.find(".//input[@name='CaptchaInputText']") is None:
99 return None
100 return ((), protected_dl)
102 def protected_dl(response, url, handlers):
103 form = response.find(".//*[@class='container body-content']//form")
104 img = form.find(".//img[@id='CaptchaImage']").get('src')
105 with ExitStack() as cleanup:
106 [header, response] = request_cached(urljoin(url, img), img,
107 cleanup=cleanup,
108 urlopen=urllib.request.build_opener(*handlers).open,
109 headers=(
110 ("User-Agent", "hostdl"),
113 stderr.write(header.as_string())
114 tk = Tk()
115 img = PhotoImage(data=response.read())
116 Label(tk, image=img).pack()
117 response = StringVar()
118 entry = Entry(tk, textvariable=response)
119 entry.bind('<Return>', lambda *pos, **kw: tk.destroy())
120 entry.pack()
121 entry.focus_set()
122 tk.mainloop()
123 # Keep a reference to "img" alive while displayed
124 del img
126 action = form.get('action', "")
127 assert form.get('method') == "post"
129 fields = list()
130 for input in form.iterfind(".//input[@name]"):
131 if input.get('name') == "CaptchaInputText":
132 fields.append(("CaptchaInputText", response.get() ))
133 else:
134 fields.append(( input.get('name'), input.get('value') ))
135 response = request_html(urljoin(url, action),
136 data=urlencode(fields).encode('ascii'),
137 urlopen=urllib.request.build_opener(*handlers).open,
140 text = response.find(".//*[@class='Encrypted-folder']")
141 text = ''.join(text.itertext())
142 assert text.endswith(']')
143 text = text[:-1].strip()
144 assert text.endswith('MB')
145 [file, MiB] = text[:-2].split('[', 1)
146 print(f'Filename: {file.rstrip()}', file=stderr)
147 print(f'Size: {mib_size(MiB.strip())}', file=stderr)
149 for host in response.iterfind(".//*[@class='UploadHost']//img"):
150 host = host.get("src")
151 assert host.startswith('/content/images/bigicon/')
152 assert host.endswith('.png')
153 host = host[len("/content/images/bigicon/"):-len(".png")]
154 print(f'Host: {host}', file=stderr)
155 slug = response.find(".//*[@class='UploadHost']").get('data-slug')
157 fields = {'link': slug}
158 VAR_FIELDS = {'token': 'token', 'Slug': 'folder'}
159 for script in response.iter('script'):
160 script = ''.join(script.itertext())
161 if not script:
162 continue
163 for [stmt, vars] in javascript.parse(StringIO(script)):
164 assert stmt == 'var'
165 for [name, value] in vars:
166 if name in VAR_FIELDS:
167 assert isinstance(value, str)
168 fields[VAR_FIELDS[name]] = value
170 url = urljoin(url, '/admin/Main/GetInFo')
171 with ExitStack() as cleanup:
172 [header, response] = request_text(url,
173 data=urlencode(fields).encode('ascii'),
174 cleanup=cleanup,
175 urlopen=urllib.request.build_opener(*handlers).open,
177 response = response.read()
178 assert response.startswith('redirect: '), response
179 return response[len('redirect: '):]
181 def wait(time):
182 stderr.write("Waiting ")
183 [mins, secs] = divmod(time, 60)
184 if mins > 0:
185 [hrs, mins] = divmod(mins, 60)
186 if hrs > 0:
187 stderr.write(format(hrs) + "h ")
188 stderr.write(format(mins) + "m ")
189 print(secs, end="s", flush=True, file=stderr)
190 sleep(time)
191 print(flush=True, file=stderr)
193 def mib_size(size):
194 places = len(size) - size.rindex('.') - 1
195 return '{} MiB ({:.{}f} MB)'.format(size, float(size) * 1.024**2, places)
197 def request_decoded(*pos, headers=(), **kw):
198 headers += (
199 ("Accept-Encoding", "gzip, x-gzip"),
200 ("User-Agent", "hostdl"),
202 [header, response] = request_cached(*pos, headers=headers, **kw)
204 stderr.write(header.as_string())
205 for encoding in header_list(header, "Content-Encoding"):
206 if encoding.lower() in {"gzip", "x-gzip"}:
207 if isinstance(response, GzipFile):
208 raise TypeError("Recursive gzip encoding")
209 response = GzipFile(fileobj=response, mode="rb")
210 else:
211 msg = "Unhandled encoding: " + repr(encoding)
212 raise TypeError(msg)
213 return (header, response)
215 def request_text(*pos, **kw):
216 [header, response] = request_decoded(*pos, **kw)
217 try:
218 charset = header.get_content_charset()
219 return (header, TextIOWrapper(response, charset, errors='replace'))
220 except:
221 response.close()
222 raise
224 def request_html(url, *pos, **kw):
225 with ExitStack() as cleanup:
226 [header, response] = request_text(url, 'text/html',
227 *pos, cleanup=cleanup, **kw)
228 parser = HtmlTreeParser()
229 # TODO: limit data
230 copyfileobj(response, DelegateWriter(parser.feed))
231 return parser.close()
233 class HtmlTreeParser(HTMLParser):
234 def __init__(self):
235 super().__init__()
236 self._builder = TreeBuilder()
237 self.open_img = False
239 # Avoid error about multiple top-level elements
240 self._builder.start("", dict())
242 def close(self):
243 super().close()
244 if self.open_img:
245 self._builder.end('img')
246 return self._builder.close()
248 def handle_starttag(self, tag, attrs):
249 if self.open_img:
250 self._builder.end('img')
251 self._builder.start(tag, dict(attrs))
252 self.open_img = tag == 'img'
254 def handle_endtag(self, tag):
255 if self.open_img and tag != 'img':
256 self._builder.end('img')
257 self.open_img = False
258 self._builder.end(tag)
260 def handle_data(self, *pos, **kw):
261 if self.open_img:
262 self._builder.end('img')
263 self.open_img = False
264 self._builder.data(*pos, **kw)
266 if __name__ == "__main__":
267 import clifunc
268 clifunc.run()