Silence the DeprecationWarning raised by importing mimetools in BaseHTTPServer.
[python.git] / Lib / mimetools.py
blob097eda4a75bd2fcbd4fef71a6c6e223594a6094c
1 """Various tools used by MIME-reading or MIME-writing programs."""
4 import os
5 import tempfile
6 from test.test_support import catch_warning
7 from warnings import filterwarnings
8 with catch_warning(record=False):
9 filterwarnings("ignore", ".*rfc822 has been removed", DeprecationWarning)
10 import rfc822
12 from warnings import warnpy3k
13 warnpy3k("in 3.x, mimetools has been removed in favor of the email package",
14 stacklevel=2)
16 __all__ = ["Message","choose_boundary","encode","decode","copyliteral",
17 "copybinary"]
19 class Message(rfc822.Message):
20 """A derived class of rfc822.Message that knows about MIME headers and
21 contains some hooks for decoding encoded and multipart messages."""
23 def __init__(self, fp, seekable = 1):
24 rfc822.Message.__init__(self, fp, seekable)
25 self.encodingheader = \
26 self.getheader('content-transfer-encoding')
27 self.typeheader = \
28 self.getheader('content-type')
29 self.parsetype()
30 self.parseplist()
32 def parsetype(self):
33 str = self.typeheader
34 if str is None:
35 str = 'text/plain'
36 if ';' in str:
37 i = str.index(';')
38 self.plisttext = str[i:]
39 str = str[:i]
40 else:
41 self.plisttext = ''
42 fields = str.split('/')
43 for i in range(len(fields)):
44 fields[i] = fields[i].strip().lower()
45 self.type = '/'.join(fields)
46 self.maintype = fields[0]
47 self.subtype = '/'.join(fields[1:])
49 def parseplist(self):
50 str = self.plisttext
51 self.plist = []
52 while str[:1] == ';':
53 str = str[1:]
54 if ';' in str:
55 # XXX Should parse quotes!
56 end = str.index(';')
57 else:
58 end = len(str)
59 f = str[:end]
60 if '=' in f:
61 i = f.index('=')
62 f = f[:i].strip().lower() + \
63 '=' + f[i+1:].strip()
64 self.plist.append(f.strip())
65 str = str[end:]
67 def getplist(self):
68 return self.plist
70 def getparam(self, name):
71 name = name.lower() + '='
72 n = len(name)
73 for p in self.plist:
74 if p[:n] == name:
75 return rfc822.unquote(p[n:])
76 return None
78 def getparamnames(self):
79 result = []
80 for p in self.plist:
81 i = p.find('=')
82 if i >= 0:
83 result.append(p[:i].lower())
84 return result
86 def getencoding(self):
87 if self.encodingheader is None:
88 return '7bit'
89 return self.encodingheader.lower()
91 def gettype(self):
92 return self.type
94 def getmaintype(self):
95 return self.maintype
97 def getsubtype(self):
98 return self.subtype
103 # Utility functions
104 # -----------------
106 try:
107 import thread
108 except ImportError:
109 import dummy_thread as thread
110 _counter_lock = thread.allocate_lock()
111 del thread
113 _counter = 0
114 def _get_next_counter():
115 global _counter
116 _counter_lock.acquire()
117 _counter += 1
118 result = _counter
119 _counter_lock.release()
120 return result
122 _prefix = None
124 def choose_boundary():
125 """Return a string usable as a multipart boundary.
127 The string chosen is unique within a single program run, and
128 incorporates the user id (if available), process id (if available),
129 and current time. So it's very unlikely the returned string appears
130 in message text, but there's no guarantee.
132 The boundary contains dots so you have to quote it in the header."""
134 global _prefix
135 import time
136 if _prefix is None:
137 import socket
138 try:
139 hostid = socket.gethostbyname(socket.gethostname())
140 except socket.gaierror:
141 hostid = '127.0.0.1'
142 try:
143 uid = repr(os.getuid())
144 except AttributeError:
145 uid = '1'
146 try:
147 pid = repr(os.getpid())
148 except AttributeError:
149 pid = '1'
150 _prefix = hostid + '.' + uid + '.' + pid
151 return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter())
154 # Subroutines for decoding some common content-transfer-types
156 def decode(input, output, encoding):
157 """Decode common content-transfer-encodings (base64, quopri, uuencode)."""
158 if encoding == 'base64':
159 import base64
160 return base64.decode(input, output)
161 if encoding == 'quoted-printable':
162 import quopri
163 return quopri.decode(input, output)
164 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
165 import uu
166 return uu.decode(input, output)
167 if encoding in ('7bit', '8bit'):
168 return output.write(input.read())
169 if encoding in decodetab:
170 pipethrough(input, decodetab[encoding], output)
171 else:
172 raise ValueError, \
173 'unknown Content-Transfer-Encoding: %s' % encoding
175 def encode(input, output, encoding):
176 """Encode common content-transfer-encodings (base64, quopri, uuencode)."""
177 if encoding == 'base64':
178 import base64
179 return base64.encode(input, output)
180 if encoding == 'quoted-printable':
181 import quopri
182 return quopri.encode(input, output, 0)
183 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
184 import uu
185 return uu.encode(input, output)
186 if encoding in ('7bit', '8bit'):
187 return output.write(input.read())
188 if encoding in encodetab:
189 pipethrough(input, encodetab[encoding], output)
190 else:
191 raise ValueError, \
192 'unknown Content-Transfer-Encoding: %s' % encoding
194 # The following is no longer used for standard encodings
196 # XXX This requires that uudecode and mmencode are in $PATH
198 uudecode_pipe = '''(
199 TEMP=/tmp/@uu.$$
200 sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode
201 cat $TEMP
202 rm $TEMP
203 )'''
205 decodetab = {
206 'uuencode': uudecode_pipe,
207 'x-uuencode': uudecode_pipe,
208 'uue': uudecode_pipe,
209 'x-uue': uudecode_pipe,
210 'quoted-printable': 'mmencode -u -q',
211 'base64': 'mmencode -u -b',
214 encodetab = {
215 'x-uuencode': 'uuencode tempfile',
216 'uuencode': 'uuencode tempfile',
217 'x-uue': 'uuencode tempfile',
218 'uue': 'uuencode tempfile',
219 'quoted-printable': 'mmencode -q',
220 'base64': 'mmencode -b',
223 def pipeto(input, command):
224 pipe = os.popen(command, 'w')
225 copyliteral(input, pipe)
226 pipe.close()
228 def pipethrough(input, command, output):
229 (fd, tempname) = tempfile.mkstemp()
230 temp = os.fdopen(fd, 'w')
231 copyliteral(input, temp)
232 temp.close()
233 pipe = os.popen(command + ' <' + tempname, 'r')
234 copybinary(pipe, output)
235 pipe.close()
236 os.unlink(tempname)
238 def copyliteral(input, output):
239 while 1:
240 line = input.readline()
241 if not line: break
242 output.write(line)
244 def copybinary(input, output):
245 BUFSIZE = 8192
246 while 1:
247 line = input.read(BUFSIZE)
248 if not line: break
249 output.write(line)