Quick binary expression handling for “test_token_buffering“
[vadmium-streams.git] / httpfs.py
blob01c802a1d62c51bb6a9f91556692324d3a53ccaa
1 #! /usr/bin/env python3
3 import sys
4 from http.client import HTTP_PORT, HTTPS_PORT
5 from http import HTTPStatus
6 from socket import create_connection, IPPROTO_TCP, TCP_NODELAY
7 import selectors
8 import fuse
9 import ssl
10 import urllib.parse
11 from html.parser import HTMLParser
12 import hashlib
13 from errno import ENOENT, EIO
14 from os import fsencode
15 import string
16 import email.utils
17 from datetime import timezone
18 from math import inf
19 from select import select
20 from contextlib import ExitStack
22 def main():
23 parser = fuse.ArgumentParser("url {mnt,''}", '')
24 parser.add_argument('-v',
25 action='store_true', help='verbose messages')
26 parser.add_argument('--false-size', action='store_true',
27 help='report large file size until actual size is read')
28 parser.add_argument('--timeout-secs')
29 [url, name, args] = parser.parse_args()
31 url = encode_url(url)
32 [scheme, suffix] = url.split('://', 1)
33 [host, sep, path] = suffix.partition('/')
34 type = fuse.Filesystem.DT_DIR
35 if not path:
36 default = host
37 elif path.endswith('/'):
38 default = path[:-1].rsplit('/', 1)[-1]
39 else:
40 default = path.rsplit('/', 1)[-1]
41 type = fuse.Filesystem.DT_REG
42 if not name:
43 name = urllib.parse.unquote(default)
44 print(name)
46 self = Filesystem(scheme, host, path, name, type, args)
47 with self.cleanup:
48 self.mount("httpfs", url)
49 self.selector = selectors.DefaultSelector()
50 self.cleanup.callback(self.selector.close)
51 self.select_timeout = None
52 self.selector.register(self.fuse, selectors.EVENT_READ,
53 self.handle_request)
54 while True:
55 ready = self.selector.select(self.select_timeout)
56 for [key, events] in ready:
57 #~ try:
58 current = self.selector.get_key(key.fileobj)
59 #~ except KeyError:
60 #~ continue
61 #~ if not current.events & key.events:
62 #~ continue
63 key.data()
64 if not ready:
65 self.selector.get_key(self.conn).data(EOFError('Timed out'))
67 class Filesystem(fuse.Filesystem):
68 def __init__(self, scheme, host, path, name, type, args):
69 super().__init__(name, args)
70 self.v = args.v
71 self.false_size = args.false_size
72 self.timeout_secs = float(args.timeout_secs)
74 self.scheme = scheme
75 self.host = host
76 self.nodes = {fuse.ROOT_ID: {'path': path, 'attrs': {'type': type}}}
77 self.conn = None
78 self.cleanup = ExitStack()
79 self.cleanup.callback(self.close)
80 if type == fuse.Filesystem.DT_DIR:
81 self.make_dir()
82 self.root_len = max(path.find('?'), 0)
83 else:
84 self.make_file()
85 if self.false_size:
86 self.nodes[fuse.ROOT_ID]['attrs']['size'] = fuse.MAX_SIZE
88 def close(self):
89 try:
90 if self.conn is not None:
91 self.conn.close()
92 finally:
93 super().close()
95 def handle_request(self):
96 unique = fuse.Filesystem.handle_request(self)
97 if unique is not None:
98 self.read_unique = unique
99 # Changing events to 0 raises ValueError
100 self.selector.unregister(self.fuse)
102 def statfs(fs):
103 return dict(
104 blocks=0,
105 namelen=len(fs.name),
108 def getattr(self, node):
109 node = self.nodes[node]
110 attrs = node['attrs']
111 if attrs['type'] == fuse.Filesystem.DT_REG and 'size' not in attrs:
112 try:
113 if self.conn is None or not self.conn_idle \
114 or select((self.conn,), (), (), 0e0)[0]:
115 raise EOFError()
116 self.request(b'HEAD', node['path'])
117 buf = Buffer(self.conn.recv_into, 0x10000)
118 header = buf.read_until(b'\r\n\r\n')
119 except (EOFError, ConnectionError):
120 if self.conn is not None:
121 self.conn.close()
122 self.conn = None
123 self.open_conn()
124 self.request(b'HEAD', node['path'])
125 buf = Buffer(self.conn.recv_into, 0x10000)
126 header = buf.read_until(b'\r\n\r\n')
127 status = get_http_status(header)
128 conn = get_list_field(header, b'Connection')
129 self.conn_idle = b'close' not in conn
130 if status != HTTPStatus.OK:
131 raise ValueError(f'HTTP {status} response')
133 try:
134 attrs['size'] = int(get_field(header, b'Content-Length'))
135 except ValueError:
136 print('HEAD response without Content-Length')
137 mtime = get_field(header, b'Last-Modified').decode('ascii')
138 mtime = email.utils.parsedate_to_datetime(mtime)
139 if mtime.tzinfo is None:
140 mtime = mtime.replace(tzinfo=timezone.utc)
141 attrs['mtime'] = int(mtime.timestamp())
142 return attrs
144 def readdir(self, dir, offset):
145 dir = self.get_dir(self.nodes[dir]).items()
146 for [i, [name, node]] in enumerate(dir):
147 if i < offset:
148 continue
149 yield (node, name, self.nodes[node]['attrs']['type'], i + 1)
151 def lookup(self, dir, name):
152 dir = self.get_dir(self.nodes[dir])
153 if name not in dir:
154 raise OSError(ENOENT, None)
155 return dir[name]
157 def get_dir(self, dir):
158 if 'dir' not in dir:
159 try:
160 if self.conn is None or not self.conn_idle \
161 or select((self.conn,), (), (), 0e0)[0]:
162 raise EOFError()
163 self.request(b'GET', dir['path'], b'Accept: text/html\r\n')
164 buf = Buffer(self.conn.recv_into, 0x10000)
165 header = buf.read_until(b'\r\n\r\n')
166 except (EOFError, ConnectionError):
167 if self.conn is not None:
168 self.conn.close()
169 self.conn = None
170 self.open_conn()
171 self.request(b'GET', dir['path'], b'Accept: text/html\r\n')
172 buf = Buffer(self.conn.recv_into, 0x10000)
173 header = buf.read_until(b'\r\n\r\n')
174 status = get_http_status(header)
175 if status != HTTPStatus.OK:
176 raise ValueError(f'HTTP {status} response')
177 ctype = get_field(header, b'Content-Type')
178 if ctype.split(b';', 1)[0].strip(b' \t').lower() != b'text/html':
179 raise TypeError(repr(ctype))
180 index = IndexParser()
182 encoding = get_list_field(header, b'Transfer-Encoding')
183 if tuple(encoding) == (b'chunked',):
184 while True:
185 chunk = buf.read_until(b'\r\n').rstrip(b'\r\n')
186 chunk = int(chunk.split(b';', 1)[0], 16)
187 if chunk == 0:
188 break
189 buf.copy(chunk, index.feed)
190 while buf.filled < 2:
191 view = memoryview(buf.buf)[buf.filled:]
192 buf.filled += buf.fill(view)
193 if not buf.buf.startswith(b'\r\n'):
194 raise ValueError('Expected CRLF at end of chunk')
195 buf.buf[ : buf.filled - 2] = buf.buf[2:]
196 buf.filled -= 2
197 while buf.read_until(b'\r\n') != b'\r\n':
198 pass
199 else:
200 length = int(get_field(header, b'Content-Length'))
201 buf.copy(length, index.feed)
202 conn = get_list_field(header, b'Connection')
203 self.conn_idle = b'close' not in conn
204 index.close()
206 dir['dir'] = dict()
207 for [file, [ntype, url]] in index.files.items():
208 url = dir['path'].split('?', 1)[0] + url
209 burl = url.encode('ascii')
210 if len(url) - self.root_len <= 10:
211 hash = fuse.ROOT_ID
212 place = 1
213 for d in burl[self.root_len:].translate(self.URL_TRANS):
214 hash += d * place
215 place *= 84
216 else:
217 hash = hashlib.sha256(burl).digest()
218 hash = int.from_bytes(hash[:64//8], 'big')
219 if hash in self.nodes:
220 continue
221 dir['dir'][fsencode(file)] = hash
222 self.nodes[hash] = {'path': url, 'attrs': {'type': ntype}}
223 if ntype == fuse.Filesystem.DT_REG and self.false_size:
224 self.nodes[hash]['attrs']['size'] = fuse.MAX_SIZE
225 return dir['dir']
227 URL_TRANS = bytes.maketrans(
228 f"%{string.ascii_letters}{string.digits}-._~!$&'()*+,;=/:?@[]"
229 .encode('ascii'), bytes(range(84)) )
231 def read(self, nodeid, offset, size):
232 if offset >= self.nodes[nodeid]['attrs'].get('size', inf):
233 return b''
234 if self.conn is not None:
235 if self.conn_idle and select((self.conn,), (), (), 0e0)[0]:
236 self.conn_idle = False
237 if not self.conn_idle \
238 and (offset != self.conn_offset or nodeid != self.conn_node):
239 self.conn.close()
240 self.conn = None
241 self.conn_fresh = self.conn is None
242 if self.conn_fresh:
243 self.open_conn()
244 self.offset = offset
245 self.buffer = Buffer(None, size)
246 if self.conn_idle:
247 self.conn_node = nodeid
248 self.conn_offset = None
249 self.start_recv()
250 elif self.read_resp():
251 return self.finish_read()
252 else:
253 self.selector.register(self.conn, selectors.EVENT_READ, self.recv_body)
254 self.select_timeout = self.timeout_secs
255 return ...
257 def open_conn(self):
258 port = {'http': HTTP_PORT, 'https': HTTPS_PORT}[self.scheme]
259 self.conn = create_connection((self.host, port))
260 self.conn_idle = True
261 self.conn.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
262 if self.scheme != 'http':
263 self.conn = ssl.create_default_context().wrap_socket(self.conn,
264 suppress_ragged_eofs=False, server_hostname=self.host)
265 self.conn_offset = None
267 def request(self, method, target, fields=b''):
268 self.conn_idle = False
269 with self.conn.makefile('wb') as writer:
270 writer.write(method)
271 writer.write(b' /')
272 writer.write(target.encode('ascii'))
273 writer.write(b' HTTP/1.1\r\n'
274 b'Host: ')
275 writer.write(self.host.encode('ascii'))
276 writer.write(b'\r\n')
277 writer.write(fields)
278 writer.write(b'User-Agent: httpfs\r\n'
279 b'\r\n')
281 def start_recv(self):
282 range = b'Range: bytes=%d-\r\n' % (self.offset + self.buffer.filled)
283 self.request(b'GET', self.nodes[self.conn_node]['path'], range)
284 self.resp = Buffer(self.conn.recv_into, 0x10000)
285 self.selector.register(self.conn, selectors.EVENT_READ,
286 self.recv_header)
287 self.select_timeout = self.timeout_secs
289 def recv_header(self, exc=None):
290 try:
291 if self.v and not self.resp.filled:
292 sys.stderr.write('Starting to receive HTTP response\n')
293 searched = max(self.resp.filled - 3, 0)
294 if self.recv(self.resp, exc=exc) is None:
295 return
296 end = self.resp.buf.find(b'\r\n\r\n', searched, self.resp.filled)
297 if end < 0:
298 assert self.resp.filled < len(self.resp.buf)
299 return
301 header = self.resp.buf[ : end + 2]
302 status = get_http_status(header)
303 if status == HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE:
304 self.conn_offset = None
305 self.reply_read(b'')
306 return
307 if status != HTTPStatus.PARTIAL_CONTENT \
308 and (status != HTTPStatus.OK
309 or self.offset + self.buffer.filled > 0):
310 msg = header.split(b'\r\n', 1)[0]
311 raise Exception(msg.decode('ascii'))
312 assert not tuple(get_list_field(header, b'Transfer-Encoding'))
313 self.resp_len = int(get_field(header, b'Content-Length'))
314 conn = get_list_field(header, b'Connection')
315 self.keepalive = b'close' not in conn
317 del self.resp.buf[self.resp.filled:]
318 del self.resp.buf[ : end + 4]
319 if not self.read_resp():
320 self.selector.modify(self.conn, selectors.EVENT_READ, self.recv_body)
321 return
322 except Exception as exc:
323 sys.excepthook(type(exc), exc, exc.__traceback__)
324 self.reply_error(self.read_unique, EIO)
325 if self.conn:
326 self.selector.unregister(self.conn)
327 self.select_timeout = None
328 self.selector.register(self.fuse, selectors.EVENT_READ,
329 self.handle_request)
330 return
331 self.reply_read(self.finish_read())
333 def read_resp(self):
334 chunk = min(len(self.buffer.buf) - self.buffer.filled, self.resp_len)
335 chunk = self.resp.buf[:chunk]
336 del self.resp.buf[:len(chunk)]
337 self.buffer.buf[self.buffer.filled : self.buffer.filled + len(chunk)] = chunk
338 self.buffer.filled += len(chunk)
339 self.resp_len -= len(chunk)
340 return self.resp_len == 0 or self.buffer.filled == len(self.buffer.buf)
342 def recv_body(self, exc=None):
343 try:
344 self.buffer.fill = self.conn.recv_into
345 chunk = self.recv(self.buffer,
346 self.buffer.filled + self.resp_len, exc=exc)
347 if chunk is None:
348 return
349 self.resp_len -= chunk
350 if self.resp_len > 0 and self.buffer.filled < len(self.buffer.buf):
351 return
352 except Exception as exc:
353 sys.excepthook(type(exc), exc, exc.__traceback__)
354 self.reply_error(self.read_unique, EIO)
355 if self.conn:
356 self.selector.unregister(self.conn)
357 self.select_timeout = None
358 self.selector.register(self.fuse, selectors.EVENT_READ,
359 self.handle_request)
360 return
361 self.reply_read(self.finish_read())
363 def recv(self, buf, limit=None, *, exc=None):
364 try:
365 if exc:
366 sys.stderr.write('Receive timed out\n')
367 raise exc
368 return buf.recv(limit)
369 except (ConnectionError, EOFError):
370 if self.conn_fresh:
371 raise
372 self.selector.unregister(self.conn)
373 self.conn.close()
374 self.conn = None
375 self.open_conn()
376 self.conn_fresh = True
377 self.start_recv()
378 return None
380 def finish_read(self):
381 self.conn_offset = self.offset + self.buffer.filled
382 if self.resp_len == 0:
383 self.conn_idle = self.keepalive
384 # Only correct the size once the EOF has been read, otherwise if
385 # the kernel discovers the changed attribute, it seems to
386 # invalidate is cache of data already read (including readahead)
387 self.nodes[self.conn_node]['attrs']['size'] = self.conn_offset
388 del self.buffer.buf[self.buffer.filled:]
389 return self.buffer.buf
391 def reply_read(self, buffer):
392 self.reply(self.read_unique, buffer)
394 self.selector.unregister(self.conn)
395 self.select_timeout = None
396 self.selector.register(self.fuse, selectors.EVENT_READ,
397 self.handle_request)
399 class IndexParser(HTMLParser):
400 def __init__(self):
401 super().__init__()
402 self.files = dict()
404 def feed(self, data):
405 return super().feed(data.decode('ascii', 'replace'))
407 def handle_starttag(self, tag, attrs):
408 if tag == 'a':
409 for [name, value] in attrs:
410 if name != 'href':
411 continue
412 if '#' in value:
413 continue
414 if value.startswith('/'):
415 continue
416 file = value.split('?', 1)[0]
417 if file.endswith('/'):
418 file = file[:-1]
419 type = fuse.Filesystem.DT_DIR
420 else:
421 type = fuse.Filesystem.DT_REG
422 if not frozenset(':/').isdisjoint(file):
423 continue
424 file = urllib.parse.unquote(file)
425 if file in '..':
426 continue
427 self.files[file] = (type, encode_url(value))
428 super().handle_starttag(tag, attrs)
430 def get_http_status(header):
431 version = b'HTTP/1.1 '
432 if not header.startswith(version):
433 raise NotImplementedError('Not a HTTP 1.1 response')
434 p = len(version)
435 return int(header[p : p + 3])
437 def get_list_field(header, name):
438 try:
439 header = get_field(header, name)
440 except ValueError:
441 return
442 for i in header.lower().split(b','):
443 i = i.strip(b' \t')
444 if i:
445 yield i
447 def get_field(header, name):
448 name = b'\r\n%b:' % name.title()
449 pos = header.title().index(name)
450 pos += len(name)
451 return header[pos:].split(b'\r\n')[0].strip(b' \t')
453 def encode_url(url):
454 return urllib.parse.quote(url, safe="%:/?#[]@!$&'()*+,;=")
456 class Buffer:
457 def __init__(self, fill, size):
458 self.fill = fill
459 self.buf = bytearray(size)
460 self.filled = 0
462 def read_until(self, end):
463 search = 0
464 while True:
465 found = self.buf.find(end, search, self.filled)
466 if found >= 0:
467 break
468 if self.filled == len(self.buf):
469 raise ValueError('Excessive field size')
470 search = max(self.filled - len(end) + 1, 0)
471 self.recv()
472 found += len(end)
473 result = self.buf[:found]
474 self.buf[ : self.filled - found] = self.buf[found:self.filled]
475 self.filled -= found
476 return result
478 def copy(self, length, feed):
479 while length > self.filled:
480 feed(self.buf[:self.filled])
481 length -= self.filled
482 self.filled = 0
483 self.recv()
485 feed(self.buf[:length])
486 self.buf[ : self.filled - length] = self.buf[length:self.filled]
487 self.filled -= length
489 def recv(self, limit=None):
490 with memoryview(self.buf) as view, view[self.filled:limit] as view:
491 filled = self.fill(view)
492 if not filled:
493 raise EOFError()
494 self.filled += filled
495 return filled
497 if __name__ == '__main__':
498 with fuse.handle_termination():
499 main()