compresslog: print stats at exit
[compresslog.git] / compresslog
bloba6a1696848b01a52754ad72ed409d4c258098671
1 #!/usr/bin/python
3 ## echo | compresslog /path/to/my/log
5 import os
6 import sys
7 import select
8 import signal
9 import gzip
10 import zlib
11 import subprocess
12 import fcntl
13 import errno
14 from optparse import OptionParser
15 import logging
16 import logging.handlers
17 import time
18 from datetime import datetime
19 import apache_log_parser
20 import numpy as np
21 import json
24 class parse_stats:
26 def __init__(self, logfmt):
27 self._parse_line = apache_log_parser.make_parser(logfmt)
28 self._reset()
29 self.ajax_mimetypes = set(['application/ajax', 'text/xml'])
31 def _reset(self):
32 self.ls_users = set()
33 self.ct_hits = 0
34 self.ct_pviews = 0
35 self.pf_pviews_time = list()
36 self.ct_ajax_hits = 0
38 def parse_line(self, logline):
39 try:
40 log_parsed = self._parse_line(logline)
41 except:
42 logger.error("could not parse line: %s" % logline)
43 return
44 if log_parsed['status'] != '200':
45 return
46 self.ct_hits = self.ct_hits + 1
47 if log_parsed['request_method'] == 'POST':
48 if log_parsed['response_header_content_type'] in self.ajax_mimetypes:
49 self.ct_ajax_hits = self.ct_ajax_hits + 1
50 if log_parsed['response_header_content_type'] == 'text/plain':
51 if log_parsed['request_url'].find('ajax') > -1:
52 self.ct_ajax_hits = self.ct_ajax_hits + 1
54 if log_parsed['response_header_content_type'] != 'text/html':
55 return
56 self.ct_pviews = self.ct_pviews + 1
57 self.pf_pviews_time.append(int(log_parsed['time_us']))
58 user = log_parsed['note_moodleuser']
59 if user != '-':
60 self.ls_users.add(user)
62 def log_stats(self, boundary):
63 stats = { 'hits': 0,
64 'pviews': 0,
65 'pview_avg_us': 0,
66 'pview_50th_us': 0,
67 'pview_50th_us': 0,
68 'unique_users': 0,
69 'ajax_hits': 0,
70 'boundary_epoch': boundary
72 if self.ct_hits:
73 stats['hits'] = self.ct_hits
74 stats['ajax_hits'] = self.ct_ajax_hits
75 if self.ct_pviews:
76 stats['pviews'] = self.ct_pviews
77 stats['pview_avg_us'] = np.average(self.pf_pviews_time),
78 stats['pview_50th_us'] = np.percentile(self.pf_pviews_time,50),
79 stats['pview_50th_us'] = np.percentile(self.pf_pviews_time,80),
80 stats['unique_users'] = len(self.ls_users),
81 # passing separators explicitly gives us more compact encoding
82 logger.info("stats %s" % json.dumps(stats, separators=(',',':')))
83 self._reset()
86 # TODO: Find a reasonable way to make this configurable...
87 pstats = parse_stats('%h %l %{MOODLEUSER}n %t %D \"%{Content-Type}o\" \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\"')
89 parser = OptionParser()
90 parser.add_option('-d', '--debug', dest='debug', default=0, action='count',
91 help='Print debugging info to stderr, no output to syslog')
92 parser.add_option('--stats', dest='stats', default=False, action='store_true',
93 help='Parse and process stats assuming it is a specially-formatted apache access log')
94 (opts, args) = parser.parse_args()
95 fname = args[0]
98 ## Setup logging to syslog bright and early
99 ##
100 # when not in debug mode,
101 # this will route stderr to syslog, including
102 # stacktraces from unhandled exceptions
103 logger = logging.getLogger('www_stats')
104 logger.setLevel(logging.INFO)
105 if opts.debug > 0: # "debug" mode disables logging to syslog
106 logh = logging.StreamHandler()
107 # docs tell us to set logging at the handler level
108 # but that's not what works, setting it at the global
109 # logger level does work
110 #logh.setLevel(logging.DEBUG)
111 if opts.debug > 1:
112 logger.setLevel(logging.DEBUG)
113 else:
114 # writing to /dev/log makes it compat w systemd's journal
115 logh = logging.handlers.SysLogHandler(address='/dev/log')
116 # logger.setLevel(logging.DEBUG)
117 logf = logging.Formatter('compresslog: %(message)s')
118 logh.setFormatter(logf)
119 logger.addHandler(logh)
121 def errorprint(msg):
122 logger.error(msg)
123 sys.stderr.write(msg +'\n')
125 def alt_name(fname):
126 # using microsecond (%f) to sidestep races
127 datestr = datetime.now().strftime('_%Y%m%d-%H%M%S-%f')
128 if fname.endswith('_log'):
129 return fname[0:-4] + datestr + '_log'
130 else:
131 return fname + datestr
133 ## Work around several issues
135 ## - gzip files, due to their nature cannot sustain two simultaneous
136 ## writers. Pure text log files can, thanks to promises from POSIX
137 ## implemented in the kernel about the integrity of writes of
138 ## newline-delimited data.
140 ## - Apache 2.2 will sometimes open multiple piped loggers to the same
141 ## file. Apache 2.4 seems to only do it if the piped logger config
142 ## string is different. v2.2 is less predictable on this.
143 ## This means several compresslog processes are started at the same
144 ## time and race to create the file.
146 while True:
147 renamed = False
148 ## Try to open file exclusively
149 f = gzip.open(fname, 'a')
150 try:
151 fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
152 except IOError, e:
153 if e.errno == errno.EACCES or e.errno == errno.EAGAIN:
154 sys.stderr.write("Cannot flock %s\n" % fname)
155 if renamed:
156 sys.exit(1)
157 else:
158 fname = alt_name(fname)
159 sys.stderr.write("Retrying with %s\n" % fname)
160 renamed = True
161 continue
162 else:
163 errorprint("Unexpected error in flock(): %u" % e.errno)
164 raise
166 # make sure we are appending to an empty or a gzip file
167 # to avoid appending to a plaintext log
168 if os.stat(fname)[6] != 0:
169 mimetype = subprocess.Popen(['/usr/bin/file', '--mime-type', '-b', fname],
170 stdout=subprocess.PIPE).communicate()[0]
171 mimetype = mimetype.rstrip()
172 if mimetype != 'application/x-gzip':
173 mvfname = alt_name(fname)
174 errorprint("Destination file exists and is not compressed, renaming old file to "
175 + mimetype + mvfname)
176 os.rename(fname, mvfname)
177 # release the lock, restart the process
178 # unfortunately this will append a gzip header
179 f.close()
180 continue
182 # success - out of the error handling loop
183 break
185 sig_wants_flush = False
186 sig_wants_flushexit = False
188 def sig_flush(signum, frame):
189 logger.debug('sig_flush')
190 global sig_wants_flush
191 sig_wants_flush = True
193 def sig_flushexit(signum, frame):
194 logger.debug('sig_flushexit')
195 global sig_wants_flushexit
196 global timeout
197 sig_wants_flushexit = True
198 timeout = 0
200 ## ensure we flush on various signals
201 # USR1/USR2: user asked to flush out
202 # HUP: apache is no longer writing to us (hung up)
203 signal.signal(signal.SIGUSR1, sig_flush)
204 signal.signal(signal.SIGUSR2, sig_flush)
205 signal.signal(signal.SIGHUP, sig_flushexit)
206 signal.signal(signal.SIGTERM, sig_flushexit)
208 # while True / select / read(int) / O_NONBLOCK put
209 # input buffering explicitly in our hands
210 fcntl.fcntl(sys.stdin, fcntl.F_SETFL, os.O_NONBLOCK)
212 at_eof=False
213 timeout=60
214 statsboundary=False
215 buf = False
216 buftail = False
217 linecount = 0
218 PERIOD = 60 # 1m
220 while True:
221 if opts.stats:
222 # calculate next period boundary
223 # and use that as timeout
224 t = time.time()
225 if (statsboundary == False) or (statsboundary < t) :
226 # next 5m boundary
227 statsboundary = t - (t % PERIOD ) + PERIOD
228 timeout = statsboundary - t
229 logger.debug('timeout in: %u s' % timeout)
230 try:
231 rfds = select.select([sys.stdin], [], [], timeout)[0]
232 logger.debug('select: %u' % len(rfds))
233 if len(rfds): # only read() when select says
234 logger.debug('read()')
235 # will not block
236 buf = sys.stdin.read(4096)
237 logger.debug('read %u' % len(buf))
238 if buf:
239 # If we do get EINTR during write, in Python < 2.7.2
240 # the write may be screwed/incomplete, but we
241 # have no way to know&retry.
242 # http://bugs.python.org/issue10956
243 f.write(buf)
244 if opts.stats:
245 # collect lines
246 if buftail:
247 logger.debug('line tail length %u' % len(buftail))
248 buf = buftail + buf
249 loglines = buf.split('\n')
250 # save the tail for later
251 buftail = loglines.pop()
252 for logline in loglines:
253 linecount = linecount + 1
254 pstats.parse_line(logline)
255 else:
256 at_eof = True
257 logger.debug('at_eof')
258 if opts.stats:
259 t = time.time()
260 if t > statsboundary:
261 pstats.log_stats(statsboundary)
262 except select.error, e:
263 if e[0] == errno.EINTR:
264 logger.debug('E in select: %u' % e[0])
265 continue # on signal, restart at the top
266 else:
267 raise
268 except IOError, e:
269 logger.debug('E in read() or write(): %u' % e[0])
270 if e[0] == errno.EINTR:
271 continue # on signal, restart at the top
272 else:
273 raise
275 if at_eof or sig_wants_flushexit:
276 f.close()
277 sys.stdin.close()
278 if opts.stats:
279 pstats.log_stats(statsboundary)
280 logger.debug('fh closed, exiting')
281 sys.exit(0)
283 if sig_wants_flush:
284 sig_wants_flush = False
285 try:
286 f.flush(zlib.Z_FULL_FLUSH)
287 logger.debug('flush')
288 except:
289 logger.debug('E in flush')
290 # ignore exceptions, try to keep rolling
291 pass