Issue #789290: make sure that hash(2**63) == hash(2.**63) on 64-bit
[python.git] / Lib / mimetools.py
blob71ca8f8593f2b64d08ddeb02a38dee1fef827f1c
1 """Various tools used by MIME-reading or MIME-writing programs."""
4 import os
5 import sys
6 import tempfile
7 from warnings import filterwarnings, catch_warnings
8 with catch_warnings():
9 if sys.py3kwarning:
10 filterwarnings("ignore", ".*rfc822 has been removed", DeprecationWarning)
11 import rfc822
13 from warnings import warnpy3k
14 warnpy3k("in 3.x, mimetools has been removed in favor of the email package",
15 stacklevel=2)
17 __all__ = ["Message","choose_boundary","encode","decode","copyliteral",
18 "copybinary"]
20 class Message(rfc822.Message):
21 """A derived class of rfc822.Message that knows about MIME headers and
22 contains some hooks for decoding encoded and multipart messages."""
24 def __init__(self, fp, seekable = 1):
25 rfc822.Message.__init__(self, fp, seekable)
26 self.encodingheader = \
27 self.getheader('content-transfer-encoding')
28 self.typeheader = \
29 self.getheader('content-type')
30 self.parsetype()
31 self.parseplist()
33 def parsetype(self):
34 str = self.typeheader
35 if str is None:
36 str = 'text/plain'
37 if ';' in str:
38 i = str.index(';')
39 self.plisttext = str[i:]
40 str = str[:i]
41 else:
42 self.plisttext = ''
43 fields = str.split('/')
44 for i in range(len(fields)):
45 fields[i] = fields[i].strip().lower()
46 self.type = '/'.join(fields)
47 self.maintype = fields[0]
48 self.subtype = '/'.join(fields[1:])
50 def parseplist(self):
51 str = self.plisttext
52 self.plist = []
53 while str[:1] == ';':
54 str = str[1:]
55 if ';' in str:
56 # XXX Should parse quotes!
57 end = str.index(';')
58 else:
59 end = len(str)
60 f = str[:end]
61 if '=' in f:
62 i = f.index('=')
63 f = f[:i].strip().lower() + \
64 '=' + f[i+1:].strip()
65 self.plist.append(f.strip())
66 str = str[end:]
68 def getplist(self):
69 return self.plist
71 def getparam(self, name):
72 name = name.lower() + '='
73 n = len(name)
74 for p in self.plist:
75 if p[:n] == name:
76 return rfc822.unquote(p[n:])
77 return None
79 def getparamnames(self):
80 result = []
81 for p in self.plist:
82 i = p.find('=')
83 if i >= 0:
84 result.append(p[:i].lower())
85 return result
87 def getencoding(self):
88 if self.encodingheader is None:
89 return '7bit'
90 return self.encodingheader.lower()
92 def gettype(self):
93 return self.type
95 def getmaintype(self):
96 return self.maintype
98 def getsubtype(self):
99 return self.subtype
104 # Utility functions
105 # -----------------
107 try:
108 import thread
109 except ImportError:
110 import dummy_thread as thread
111 _counter_lock = thread.allocate_lock()
112 del thread
114 _counter = 0
115 def _get_next_counter():
116 global _counter
117 _counter_lock.acquire()
118 _counter += 1
119 result = _counter
120 _counter_lock.release()
121 return result
123 _prefix = None
125 def choose_boundary():
126 """Return a string usable as a multipart boundary.
128 The string chosen is unique within a single program run, and
129 incorporates the user id (if available), process id (if available),
130 and current time. So it's very unlikely the returned string appears
131 in message text, but there's no guarantee.
133 The boundary contains dots so you have to quote it in the header."""
135 global _prefix
136 import time
137 if _prefix is None:
138 import socket
139 try:
140 hostid = socket.gethostbyname(socket.gethostname())
141 except socket.gaierror:
142 hostid = '127.0.0.1'
143 try:
144 uid = repr(os.getuid())
145 except AttributeError:
146 uid = '1'
147 try:
148 pid = repr(os.getpid())
149 except AttributeError:
150 pid = '1'
151 _prefix = hostid + '.' + uid + '.' + pid
152 return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter())
155 # Subroutines for decoding some common content-transfer-types
157 def decode(input, output, encoding):
158 """Decode common content-transfer-encodings (base64, quopri, uuencode)."""
159 if encoding == 'base64':
160 import base64
161 return base64.decode(input, output)
162 if encoding == 'quoted-printable':
163 import quopri
164 return quopri.decode(input, output)
165 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
166 import uu
167 return uu.decode(input, output)
168 if encoding in ('7bit', '8bit'):
169 return output.write(input.read())
170 if encoding in decodetab:
171 pipethrough(input, decodetab[encoding], output)
172 else:
173 raise ValueError, \
174 'unknown Content-Transfer-Encoding: %s' % encoding
176 def encode(input, output, encoding):
177 """Encode common content-transfer-encodings (base64, quopri, uuencode)."""
178 if encoding == 'base64':
179 import base64
180 return base64.encode(input, output)
181 if encoding == 'quoted-printable':
182 import quopri
183 return quopri.encode(input, output, 0)
184 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
185 import uu
186 return uu.encode(input, output)
187 if encoding in ('7bit', '8bit'):
188 return output.write(input.read())
189 if encoding in encodetab:
190 pipethrough(input, encodetab[encoding], output)
191 else:
192 raise ValueError, \
193 'unknown Content-Transfer-Encoding: %s' % encoding
195 # The following is no longer used for standard encodings
197 # XXX This requires that uudecode and mmencode are in $PATH
199 uudecode_pipe = '''(
200 TEMP=/tmp/@uu.$$
201 sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode
202 cat $TEMP
203 rm $TEMP
204 )'''
206 decodetab = {
207 'uuencode': uudecode_pipe,
208 'x-uuencode': uudecode_pipe,
209 'uue': uudecode_pipe,
210 'x-uue': uudecode_pipe,
211 'quoted-printable': 'mmencode -u -q',
212 'base64': 'mmencode -u -b',
215 encodetab = {
216 'x-uuencode': 'uuencode tempfile',
217 'uuencode': 'uuencode tempfile',
218 'x-uue': 'uuencode tempfile',
219 'uue': 'uuencode tempfile',
220 'quoted-printable': 'mmencode -q',
221 'base64': 'mmencode -b',
224 def pipeto(input, command):
225 pipe = os.popen(command, 'w')
226 copyliteral(input, pipe)
227 pipe.close()
229 def pipethrough(input, command, output):
230 (fd, tempname) = tempfile.mkstemp()
231 temp = os.fdopen(fd, 'w')
232 copyliteral(input, temp)
233 temp.close()
234 pipe = os.popen(command + ' <' + tempname, 'r')
235 copybinary(pipe, output)
236 pipe.close()
237 os.unlink(tempname)
239 def copyliteral(input, output):
240 while 1:
241 line = input.readline()
242 if not line: break
243 output.write(line)
245 def copybinary(input, output):
246 BUFSIZE = 8192
247 while 1:
248 line = input.read(BUFSIZE)
249 if not line: break
250 output.write(line)