version 0.2: support for snapshots added
[smonitor.git] / monitor / datastore.py
blob72acc3b1204e4bb8e2693a2fbb2f2a9311ef8075
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
4 # Server monitoring system
6 # Copyright © 2011 Rodrigo Eduardo Lazo Paz
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Multi-dimensional Store.
25 Provides the DataStore class, a sparse three-dimensional storage data
26 structure. It stores data points, or vars, identified by name. Each
27 var has multiple timestamped (unix time) values, or `DataPoint`s,
28 grouped together into a `DataEntry`, sorted in descending order,
29 newest value first. For example, var `stock_quote` could have the
30 values (1316516400, 21.1), (1316512800, 21.3), (1316509200, 20.3)
31 representing the stock values during three consecutive hours on
32 09/20/11.
34 Vars belong to a group. Each group can have any number of vars. Using
35 the same var name in multiple groups simulates the concept of column
36 in SQL databases.
38 Data is persistent through Python's Pickle interface, therefore, the
39 three classes in this module, `DataPoint`, `DataEntry` and
40 `DataStore`, must be imported when using the dump/load methods,
41 otherwise, Python may complain about missing class declarations.
42 """
44 __author__ = "rlazo.paz@gmail.com (Rodrigo Lazo)"
45 __version__ = 0.2
48 import bisect
49 import cPickle
50 import collections
51 import os
54 class DataPoint(object): # pylint: disable=R0903
55 """Single data point.
57 Stores a value, any type, and the timestamp associated with
58 it. Provides comparison based solely on the timestamp. Any
59 comparison made to non `DataPoint` objects is always -1.
60 """
61 def __init__(self, timestamp, value):
62 self.timestamp = timestamp
63 self.value = value
65 def __cmp__(self, other):
66 if not isinstance(other, DataPoint):
67 return -1
68 return cmp(self.timestamp, other.timestamp)
70 def __str__(self):
71 return "%s@%d" % (str(self.value), self.timestamp)
73 def as_tuple(self):
74 """Returns a tuple (timestmap, value)."""
75 return (self.timestamp, self.value)
78 # TODO: (09/20) compare performance of insert_point and insert_point
79 # using biesct.insort and sort
80 class DataEntry(object):
81 """List of related `DataPoint`s sorted by newest first."""
83 def __init__(self):
84 self._points = []
86 def insert_point(self, timestamp, value):
87 """Inserts a single data point.
89 Arguments:
90 - `timestamp`: Positive integer, unix time associated with value.
91 - `value`: Data to insert.
93 Returns:
94 True if inserted, False otherwise. Only an invalid timestamp
95 will cause the operation to return False.
96 """
97 if not isinstance(timestamp, int) or timestamp < 0:
98 return False
99 bisect.insort(self._points, DataPoint(timestamp, value))
100 return True
102 def insert_points(self, values):
103 """Inserts a list of values.
105 Arguments:
106 - `values`: Iterable, containing pairs of (timestamp,
107 value). `timestamp` must be positive integer, `value` can be
108 any object.
110 Returns:
111 True if all the points in `values` could be correctly
112 inserted, False otherwise. Only an invalid timestamp will
113 cause the operation to return False.
115 flag = True
116 for timestamp, value in values:
117 if not isinstance(timestamp, int) or timestamp < 0:
118 flag = False
119 else:
120 self._points.append(DataPoint(timestamp, value))
121 self._points.sort()
122 return flag
124 def get_latest(self):
125 """Returns the latest, by timestamp, `DataPoint`."""
126 return self._points[-1]
128 def get_all(self):
129 """Returns an interable of all `DataPoints`, sorted by timestmap."""
130 return self._points
132 def get_since(self, timestamp):
133 """Builds an iterable of `DataPoints` since `timestamp`.
135 Arguments:
136 - `timestamp`: Positive integer, represents the timestamp of
137 the earliest `DataPoint` to return.
139 Returns:
140 An iterable of sorted, by timestamp, `DataPoints` whose
141 timestamp value is greater or equal to `timestamp` argument.
143 dummy_point = DataPoint(timestamp, None)
144 index = bisect.bisect(self._points, dummy_point)
145 if index > 0 and self._points[index - 1] == dummy_point:
146 index -= 1
147 return self._points[index:]
150 # TODO: (09/20) Make the object thread-safe.
151 class DataStore(object):
152 """Multi-dimensional data store.
154 See file level comments for further information.
157 def __init__(self):
158 self._store = collections.defaultdict(dict)
159 self._vars = set()
160 self._is_snapshot = False
162 def insert(self, group, var, timestamp, value):
163 """Inserts a single data point.
165 Arguments:
166 - `group`: String, point's group name
167 - `var`: String, point's var name
168 - `timestamp`: Positive integer, timestamp associated with
169 this point
170 - `value`: Object, data to store.
172 Returns:
173 True, if value was correctly inserted, False otherwise. Only
174 invalid timestamp values will cause the rejection of an insert.
176 return self._store[group].setdefault(
177 var, DataEntry()).insert_point(timestamp, value)
179 def insert_dict(self, data_dict):
180 """Inserts multiple data points.
182 `data_dict` must be a dictionary of values in the form:
184 {'group': {'var': (timestamp, val), "var": (timestamp, val)}}
186 See `DataStore.insert` for detailed definition of Valid values
187 for each element of the dictionary.
189 Returns:
190 True, if all values were correctly inserted, False
191 otherwise. Only invalid timestamp values will cause the
192 rejection of an insert.
194 result = True
195 for group_name, entry in data_dict.iteritems():
196 for var_name, datapoint in entry.iteritems():
197 result = result and \
198 self.insert(group_name, var_name,
199 datapoint[0], datapoint[1])
200 return result
202 def get_group(self, group):
203 """Lists all vars, and corresponding `DataEntries', for `group`.
205 Returns:
206 A dictionary in the form {'varname': `DataEntry`}, or
207 empty if group does not exist or doesn't contains data.
209 return self._store[group] if group in self._store else {}
211 def get_var(self, var):
212 """Lists all groups, and corresponding `DataEntries`, for `var`.
214 Returns:
215 A dictionary in the form {'hostname': `DataEntry`}, or
216 empty if var does not exist.
218 self._update_vars()
219 sol = {}
220 if var in self._vars:
221 for group in self._store.iterkeys():
222 if var in self._store[group]:
223 sol[group] = self._store[group][var]
224 return sol
226 def list_groups(self):
227 """Returns a list contaning the name of every group store."""
228 return self._store.keys()
230 def list_vars(self):
231 """Returns a list contaning the name of every var store."""
232 self._update_vars()
233 return list(self._vars)
235 def load(self, filename):
236 """Loads data from `filename`.
238 Any internal data stored will be deleted before loading the
239 file.
241 Arguments:
242 - `filename`: String, path to a file created by
243 `DataStore.dump` method.
245 with open(filename, 'rb') as fd:
246 obj = cPickle.load(fd)
247 self._store = obj
248 self._is_snapshot = True
249 self._vars.clear()
251 def dump(self, filename):
252 """Creates an snapshot of this object.
254 Generated file is binary. For a textual representation of the
255 data, see `DataStore.dump_as_text`.
257 Arguments:
258 - `filename`: String, path of the file to create/overwrite.
260 with open(filename, 'wb', 0) as fd:
261 cPickle.dump(self._store, fd, cPickle.HIGHEST_PROTOCOL)
262 fd.flush()
263 os.fsync(fd.fileno())
265 def dump_obj(self):
266 """Creates a snapshot of this objects and returns it as an object."""
267 return cPickle.dumps(self._store)
269 def dump_as_text(self, filename):
270 """Creates a human-readable snapshot of this object.
272 The file created by this method cannot be loaded again. To
273 create an snapshot for data persistency, see `DataStore.dump`.
275 Arguments:
276 - `filename`: String, path of the file to create/overwrite.
278 with open(filename, 'w') as fd:
279 for groupname, varss in self._store.iteritems():
280 for varname, entry in varss.iteritems():
281 points = (str(x.as_tuple()) for x in entry.get_all())
282 fd.write("%s@%s: %s\n" % (varname, groupname,
283 " ".join(points)))
284 fd.flush()
285 os.fsync(fd.fileno())
287 def is_snapshot(self):
288 return self._is_snapshot
290 def _update_vars(self, force=False):
291 """Updates internal _vars cache."""
292 if not self._vars or force:
293 self._vars.update(*[v.keys() for v in self._store.itervalues()])