version 0.2: support for snapshots added
[smonitor.git] / monitor / collector.py
blob14a706b4ff8ca065f7c52cdb04ddb2f0bdcf7618
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
4 # Server monitoring system
6 # Copyright © 2011 Rodrigo Eduardo Lazo Paz
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Data collection routines.
25 Collectors are classes which retrieve data from remote
26 servers. Currently, there is only one class implement which makes
27 parallel request to http servers.
28 """
30 __author__ = "rlazo.paz@gmail.com (Rodrigo Lazo)"
31 __version__ = 0.2
34 import Queue
35 import contextlib
36 import threading
37 import time
39 from urllib2 import urlopen, URLError
42 REQUEST_TIMEOUT = 5 # Seconds to timeout remote requests
45 # TODO: (09/19) add support for string values (Boolean?).
46 # TODO: (09/19) make parsing resilient to spaces in values.
47 # TODO: (09/20) logging facilities
48 def _retriever(source, timestamp, output_queue):
49 """Fetches and parses data from a remote source.
51 Queries a remote http server using the url
53 http://host:port/?format=text
55 It expects a plain text (no html formatting) list of values, with
56 a single data point per line, in the form:
58 [id@]varname: value
60 - `id@` is an optional variable name prefix that will be discarted
61 during parsing.
62 - `varname` is the actual varname and must not contain '@' or ' '.
63 - `value` must be a integer(42) or float(3.14159); any space will
64 break data parsing.
66 Collected data will be output to the `output_queue` as a pair:
67 ("ip:port", {"varname": (timestamp, value)}).
69 Args:
70 - `source`: Pair, (ip, port) address of source http server.
71 - `timestamp`: Integer, unix time to use as collected data's timestamp.
72 - `output_queue`: Queue, where to output the collected data, in
73 the form
75 """
76 host_address = "%s:%s" % (source[0], source[1])
77 url = "http://%s/?format=text" % host_address
78 vars_dict = {}
79 try:
80 with contextlib.closing(urlopen(url, timeout=REQUEST_TIMEOUT)) as fd:
81 for line in fd:
82 elements = line.split()
83 if (len(elements) == 2):
84 fullvarname, value = elements
85 varname = fullvarname.split('@')[-1]
86 if value == 'true':
87 value = 1
88 elif value == 'false':
89 value = 0
90 elif '.' in value:
91 value = float(value)
92 else:
93 value = int(value)
94 vars_dict[varname] = (timestamp, value)
95 output_queue.put((host_address, vars_dict))
96 except URLError:
97 print "ERROR RETRIEVING %s" % host_address
98 output_queue.put((None, None))
101 class HttpCollector(object):
102 """Retrieves data from HTTP sources.
104 Registers a list of data source server, which must expose data
105 using plain-text format through HTTP and collects their data. A
106 simple use-case is to create a scheduling mechanism that initiates
107 repetitive collections and feed this information into a
108 database. For more details about the data formatting, see the
109 `HttpCollector.collect` method.
112 def __init__(self, sources):
113 """Constructor.
115 Arguments:
116 - `sources`: Iterable, pairs of (ip, port) of data sources.
118 self._sources = sources
119 self._last_collected = {}
120 self._timestamp = 0
122 def collect(self):
123 """Retrieves data form the sources.
125 Multi-threated data retrieval. For details about data
126 processing see `_retriever`.
128 Returns:
129 Dictionary, {"ip:port": {"varname": (timestamp, value)}}.
131 result = {}
132 queue = Queue.Queue()
133 timestamp = int(time.time())
134 threads = []
135 for source in self._sources:
136 threads.append(threading.Thread(target=_retriever,
137 args=(source, timestamp, queue)))
138 threads[-1].start()
139 for _ in range(len(self._sources)):
140 host_address, data = queue.get()
141 if host_address is not None:
142 result[host_address] = data
143 self._last_collected = result
144 self._timestamp = timestamp
145 return result
147 def get_collected_data(self):
148 """Returns a copy of the last collected data.
150 See `HttpCollector.collect` for details about the returned data format.
152 return self._last_collected
154 def get_last_collection_timestamp(self):
155 """Timestamp of the lastest collection.
157 Returned valu is the unix time (as integer) of the latest
158 collection, or 0 no collection was performed.
160 return self._timestamp # 0 if never done