Move license to LICENSE
[maliang.git] / maliang.py
blob62266e777c60d38e00098aeafb9abfbc3eb0469b
1 import cgi
2 import collections
3 import http.server
4 import os
5 import pickle
6 import secrets
7 import socket
8 import socketserver
9 import threading
10 from typing import Any, IO, Tuple
12 _END_MESSAGE = b'HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'
15 class _ServerRequestHandler(http.server.BaseHTTPRequestHandler):
16 def __init__(self, request, client_address, server):
17 super(_ServerRequestHandler, self).__init__(request, client_address, server)
19 # noinspection PyPep8Naming
20 # Not me!
21 def do_POST(self):
22 # finish this up as soon as possible; the sender is possibly an interactive shell
23 self.wfile.write(_END_MESSAGE)
25 print(self.headers.get('Content-Type'))
27 # Hopefully everything is streamed.
28 fs = cgi.FieldStorage(fp=self.rfile,
29 headers=self.headers,
30 environ={
31 'REQUEST_METHOD': 'POST'
34 obj = pickle.load(fs['pickle'].file)
36 # Well, a twisted hack. We are running in a single process so it doesn't hurt.
37 assert isinstance(self.server, _CustomTCPServer)
38 assert os.getpid() == self.server.pid
40 self.server.results.append(obj)
43 class _CustomTCPServer(socketserver.TCPServer):
44 def __init__(self, server_address, request_handler_class):
45 super(_CustomTCPServer, self).__init__(server_address, request_handler_class)
46 self.pid = os.getpid()
47 self.results = collections.deque()
49 def handle_timeout(self) -> None:
50 super(_CustomTCPServer, self).handle_timeout()
53 class Server:
54 class NoData:
55 """
56 In case a None is accidentally transmitted.
58 We don't use a single object() for a more pretty and meaningful __repr__ without monkey patching. Using __eq__
59 is more foolproof as well.
60 """
62 def __eq__(self, other):
63 return True
65 def __hash__(self):
66 return 1
68 def __init__(self,
69 listen_addr: str = '127.0.0.1',
70 listen_port: int = 8848):
71 self._listen_addr = listen_addr
72 self._listen_port = listen_port
74 def recv(self) -> Any:
75 try:
76 with _CustomTCPServer((self._listen_addr, self._listen_port), _ServerRequestHandler) as httpd:
77 httpd.handle_request()
79 # We have only one request running.
80 assert len(httpd.results) == 1
81 return httpd.results[0]
82 except KeyboardInterrupt:
83 # SIGINT handling
84 httpd.shutdown()
85 return Server.NoData()
88 class Client:
89 _USER_AGENT = 'MaLiang/0.0.1'
90 _BUFFER_SIZE = 4096
92 def __init__(self,
93 server_addr: str = '127.0.0.1',
94 server_port: int = 8848):
95 self._server_addr = server_addr
96 self._server_port = server_port
98 def _socket_send_bytes(self, s: socket.socket, buffer: bytes):
99 assert len(buffer) <= self._BUFFER_SIZE
101 sent, total = 0, len(buffer)
103 tmp = buffer[:]
104 while sent < total:
105 tmp = tmp[sent:] # repeatedly shrink
106 curr_sent = s.send(tmp)
107 if curr_sent == 0:
108 raise RuntimeError('Connection closed')
109 sent += curr_sent
111 del tmp
113 def _socket_discard_bytes(self, s: socket.socket, length: int):
114 assert length <= self._BUFFER_SIZE
116 received, total = 0, length
118 while received < total:
119 tmp = s.recv(length - received)
120 if tmp == 0:
121 raise RuntimeError('Connection closed while discarding bytes')
122 received += tmp
124 @staticmethod
125 def _dump_and_close(obj: Any, file: IO[bytes]):
126 pickle.dump(obj, file)
127 file.close()
129 def _send_streamed(self, obj: Any, boundary: str,
130 r_file: IO[bytes], s: socket.socket):
132 We use raw sockets since it's pretty much the only option.
133 * It does not depend on 3rd party libraries,
134 * its streaming sending interface is implemented straightforwardly, and
135 * with it we could send an HTTP request without a proper Content-Length easily.
138 name = 'pickle'
139 mime_type = 'application/octet-stream'
141 # the headers
142 headers = ['POST / HTTP/1.1\r\n',
143 'Accept-Encoding: iso-8859-1\r\n',
144 f'Host: {self._server_port}:{self._server_addr}\r\n',
145 f'User-Agent: {self._USER_AGENT}\r\n',
146 f'Content-Type: multipart/form-data; boundary={boundary}\r\n',
147 'Content-Length: 0\r\n', # TODO
148 'Connection: Close\r\n',
149 '\r\n'
151 self._socket_send_bytes(s, ''.join(headers).encode('iso-8859-1'))
153 # the pickle
154 prefixes = [f'--{boundary}\r\n',
155 f'Content-Disposition: form-data; name="{name}"; filename="{name}"\r\n',
156 f'Content-Type: {mime_type}\r\n',
157 '\r\n'
159 self._socket_send_bytes(s, ''.join(prefixes).encode('iso-8859-1'))
161 while True:
162 try:
163 tmp = r_file.read(self._BUFFER_SIZE)
164 except EOFError:
165 break
166 if len(tmp) == 0:
167 break
169 self._socket_send_bytes(s, tmp)
171 # the footer
172 suffixes = ['\r\n',
173 f'{boundary}--\r\n\r\n'
175 self._socket_send_bytes(s, ''.join(suffixes).encode('iso-8859-1'))
177 def send(self, obj: Any) -> None:
178 boundary = secrets.token_hex(8)
179 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
180 s.connect((self._server_addr, self._server_port))
181 # one of the few ways to disable Nagle's algorithm
182 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
184 rfd, wfd = os.pipe()
185 with open(rfd, 'rb') as r_file, open(wfd, 'wb') as w_file:
186 dumper = threading.Thread(target=self._dump_and_close, args=(obj, w_file))
187 dumper.start()
188 self._send_streamed(obj, boundary, r_file, s)
189 # Tell the server we've acknowledged the response.
190 s.recv(len(_END_MESSAGE))
191 dumper.join()