Remove stray ' character in initial message
[urlwatch.git] / lib / urlwatch / handler.py
blobfcb6644db75edcadd180a680bd2c4ef70d11165f
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
4 # urlwatch is a minimalistic URL watcher written in Python
6 # Copyright (c) 2008-2014 Thomas Perl <thp.io/about>
7 # All rights reserved.
9 # Redistribution and use in source and binary forms, with or without
10 # modification, are permitted provided that the following conditions
11 # are met:
12 # 1. Redistributions of source code must retain the above copyright
13 # notice, this list of conditions and the following disclaimer.
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 # 3. The name of the author may not be used to endorse or promote products
18 # derived from this software without specific prior written permission.
20 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
21 # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
22 # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
23 # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
24 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
25 # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
29 # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 try:
33 # Available in Python 2.5 and above and preferred if available
34 import hashlib
35 have_hashlib = True
36 except ImportError:
37 # "sha" is deprecated since Python 2.5 (throws a warning in Python 2.6)
38 # Thanks to Frank Palvölgyi for reporting the warning in Python 2.6
39 import sha
40 have_hashlib = False
42 import subprocess
43 import email.utils
44 import urllib2
45 import os
46 import stat
47 import sys
48 import re
50 def get_current_user():
51 try:
52 return os.getlogin()
53 except OSError:
54 # If there is no controlling terminal, because urlwatch is launched by
55 # cron, or by a systemd.service for example, os.getlogin() fails with:
56 # OSError: [Errno 25] Inappropriate ioctl for device
57 import pwd
58 return pwd.getpwuid(os.getuid()).pw_name
60 class JobBase(object):
61 def __init__(self, location):
62 self.location = location
64 def __str__(self):
65 return self.location
67 def get_guid(self):
68 if have_hashlib:
69 sha_hash = hashlib.new('sha1')
70 location = self.location
71 if isinstance(location, unicode):
72 location = location.encode('utf-8')
73 sha_hash.update(location)
74 return sha_hash.hexdigest()
75 else:
76 return sha.new(self.location).hexdigest()
78 def retrieve(self, timestamp=None, filter_func=None, headers=None,
79 log=None):
80 raise Exception('Not implemented')
82 class ShellError(Exception):
83 """Exception for shell commands with non-zero exit code"""
85 def __init__(self, result):
86 Exception.__init__(self)
87 self.result = result
89 def __str__(self):
90 return '%s: Exit status %d' % (self.__class__.__name__, self.result)
93 def use_filter(filter_func, url, input):
94 """Apply a filter function to input from an URL"""
95 output = filter_func(url, input)
97 if output is None:
98 # If the filter does not return a value, it is
99 # assumed that the input does not need filtering.
100 # In this case, we simply return the input.
101 return input
103 return output
106 class ShellJob(JobBase):
107 def retrieve(self, timestamp=None, filter_func=None, headers=None,
108 log=None):
109 process = subprocess.Popen(self.location, \
110 stdout=subprocess.PIPE, \
111 shell=True)
112 stdout_data, stderr_data = process.communicate()
113 result = process.wait()
114 if result != 0:
115 raise ShellError(result)
117 return use_filter(filter_func, self.location, stdout_data)
120 class UrlJob(JobBase):
121 CHARSET_RE = re.compile('text/(html|plain); charset=([^;]*)')
123 def retrieve(self, timestamp=None, filter_func=None, headers=None,
124 log=None):
125 headers = dict(headers)
126 if timestamp is not None:
127 timestamp = email.utils.formatdate(timestamp)
128 headers['If-Modified-Since'] = timestamp
130 if ' ' in self.location:
131 self.location, post_data = self.location.split(' ', 1)
132 log.info('Sending POST request to %s', self.location)
133 else:
134 post_data = None
136 request = urllib2.Request(self.location, post_data, headers)
137 response = urllib2.urlopen(request)
138 headers = response.info()
139 content = response.read()
140 encoding = 'utf-8'
142 # Determine content type via HTTP headers
143 content_type = headers.get('Content-type', '')
144 content_type_match = self.CHARSET_RE.match(content_type)
145 if content_type_match:
146 encoding = content_type_match.group(2)
148 # Convert from specified encoding to unicode
149 if not isinstance(content, unicode):
150 try:
151 content = content.decode(encoding, 'ignore')
152 except LookupError:
153 # If this is an invalid encoding, decode as ascii
154 # (Debian bug 731931)
155 content = content.decode('ascii', 'ignore')
157 return use_filter(filter_func, self.location, content)
160 def parse_urls_txt(urls_txt):
161 jobs = []
163 # Security checks for shell jobs - only execute if the current UID
164 # is the same as the file/directory owner and only owner can write
165 allow_shelljobs = True
166 shelljob_errors = []
167 current_uid = os.getuid()
169 dirname = os.path.dirname(urls_txt) or '.'
170 dir_st = os.stat(dirname)
171 if (dir_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
172 shelljob_errors.append('%s is group/world-writable' % dirname)
173 allow_shelljobs = False
174 if dir_st.st_uid != current_uid:
175 shelljob_errors.append('%s not owned by %s' % (dirname, get_current_user()))
176 allow_shelljobs = False
178 file_st = os.stat(urls_txt)
179 if (file_st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) != 0:
180 shelljob_errors.append('%s is group/world-writable' % urls_txt)
181 allow_shelljobs = False
182 if file_st.st_uid != current_uid:
183 shelljob_errors.append('%s not owned by %s' % (urls_txt, get_current_user()))
184 allow_shelljobs = False
186 for line in open(urls_txt).read().splitlines():
187 if line.strip().startswith('#') or line.strip() == '':
188 continue
190 if line.startswith('|'):
191 if allow_shelljobs:
192 jobs.append(ShellJob(line[1:]))
193 else:
194 print >>sys.stderr, '\n SECURITY WARNING - Cannot run shell jobs:\n'
195 for error in shelljob_errors:
196 print >>sys.stderr, ' ', error
197 print >>sys.stderr, '\n Please remove shell jobs or fix these problems.\n'
198 sys.exit(1)
199 else:
200 jobs.append(UrlJob(line))
202 return jobs