2 # -*- coding: utf-8 -*-
4 # Server monitoring system
6 # Copyright © 2011 Rodrigo Eduardo Lazo Paz
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Multi-dimensional Store.
25 Provides the DataStore class, a sparse three-dimensional storage data
26 structure. It stores data points, or vars, identified by name. Each
27 var has multiple timestamped (unix time) values, or `DataPoint`s,
28 grouped together into a `DataEntry`, sorted in descending order,
29 newest value first. For example, var `stock_quote` could have the
30 values (1316516400, 21.1), (1316512800, 21.3), (1316509200, 20.3)
31 representing the stock values during three consecutive hours on
34 Vars belong to a group. Each group can have any number of vars. Using
35 the same var name in multiple groups simulates the concept of column
38 Data is persistent through Python's Pickle interface, therefore, the
39 three classes in this module, `DataPoint`, `DataEntry` and
40 `DataStore`, must be imported when using the dump/load methods,
41 otherwise, Python may complain about missing class declarations.
44 __author__
= "rlazo.paz@gmail.com (Rodrigo Lazo)"
54 class DataPoint(object): # pylint: disable=R0903
57 Stores a value, any type, and the timestamp associated with
58 it. Provides comparison based solely on the timestamp. Any
59 comparison made to non `DataPoint` objects is always -1.
61 def __init__(self
, timestamp
, value
):
62 self
.timestamp
= timestamp
65 def __cmp__(self
, other
):
66 if not isinstance(other
, DataPoint
):
68 return cmp(self
.timestamp
, other
.timestamp
)
71 return "%s@%d" % (str(self
.value
), self
.timestamp
)
74 """Returns a tuple (timestmap, value)."""
75 return (self
.timestamp
, self
.value
)
78 # TODO: (09/20) compare performance of insert_point and insert_point
79 # using biesct.insort and sort
80 class DataEntry(object):
81 """List of related `DataPoint`s sorted by newest first."""
86 def insert_point(self
, timestamp
, value
):
87 """Inserts a single data point.
90 - `timestamp`: Positive integer, unix time associated with value.
91 - `value`: Data to insert.
94 True if inserted, False otherwise. Only an invalid timestamp
95 will cause the operation to return False.
97 if not isinstance(timestamp
, int) or timestamp
< 0:
99 bisect
.insort(self
._points
, DataPoint(timestamp
, value
))
102 def insert_points(self
, values
):
103 """Inserts a list of values.
106 - `values`: Iterable, containing pairs of (timestamp,
107 value). `timestamp` must be positive integer, `value` can be
111 True if all the points in `values` could be correctly
112 inserted, False otherwise. Only an invalid timestamp will
113 cause the operation to return False.
116 for timestamp
, value
in values
:
117 if not isinstance(timestamp
, int) or timestamp
< 0:
120 self
._points
.append(DataPoint(timestamp
, value
))
124 def get_latest(self
):
125 """Returns the latest, by timestamp, `DataPoint`."""
126 return self
._points
[-1]
129 """Returns an interable of all `DataPoints`, sorted by timestmap."""
132 def get_since(self
, timestamp
):
133 """Builds an iterable of `DataPoints` since `timestamp`.
136 - `timestamp`: Positive integer, represents the timestamp of
137 the earliest `DataPoint` to return.
140 An iterable of sorted, by timestamp, `DataPoints` whose
141 timestamp value is greater or equal to `timestamp` argument.
143 dummy_point
= DataPoint(timestamp
, None)
144 index
= bisect
.bisect(self
._points
, dummy_point
)
145 if index
> 0 and self
._points
[index
- 1] == dummy_point
:
147 return self
._points
[index
:]
150 # TODO: (09/20) Make the object thread-safe.
151 class DataStore(object):
152 """Multi-dimensional data store.
154 See file level comments for further information.
158 self
._store
= collections
.defaultdict(dict)
160 self
._is
_snapshot
= False
162 def insert(self
, group
, var
, timestamp
, value
):
163 """Inserts a single data point.
166 - `group`: String, point's group name
167 - `var`: String, point's var name
168 - `timestamp`: Positive integer, timestamp associated with
170 - `value`: Object, data to store.
173 True, if value was correctly inserted, False otherwise. Only
174 invalid timestamp values will cause the rejection of an insert.
176 return self
._store
[group
].setdefault(
177 var
, DataEntry()).insert_point(timestamp
, value
)
179 def insert_dict(self
, data_dict
):
180 """Inserts multiple data points.
182 `data_dict` must be a dictionary of values in the form:
184 {'group': {'var': (timestamp, val), "var": (timestamp, val)}}
186 See `DataStore.insert` for detailed definition of Valid values
187 for each element of the dictionary.
190 True, if all values were correctly inserted, False
191 otherwise. Only invalid timestamp values will cause the
192 rejection of an insert.
195 for group_name
, entry
in data_dict
.iteritems():
196 for var_name
, datapoint
in entry
.iteritems():
197 result
= result
and \
198 self
.insert(group_name
, var_name
,
199 datapoint
[0], datapoint
[1])
202 def get_group(self
, group
):
203 """Lists all vars, and corresponding `DataEntries', for `group`.
206 A dictionary in the form {'varname': `DataEntry`}, or
207 empty if group does not exist or doesn't contains data.
209 return self
._store
[group
] if group
in self
._store
else {}
211 def get_var(self
, var
):
212 """Lists all groups, and corresponding `DataEntries`, for `var`.
215 A dictionary in the form {'hostname': `DataEntry`}, or
216 empty if var does not exist.
220 if var
in self
._vars
:
221 for group
in self
._store
.iterkeys():
222 if var
in self
._store
[group
]:
223 sol
[group
] = self
._store
[group
][var
]
226 def list_groups(self
):
227 """Returns a list contaning the name of every group store."""
228 return self
._store
.keys()
231 """Returns a list contaning the name of every var store."""
233 return list(self
._vars
)
235 def load(self
, filename
):
236 """Loads data from `filename`.
238 Any internal data stored will be deleted before loading the
242 - `filename`: String, path to a file created by
243 `DataStore.dump` method.
245 with
open(filename
, 'rb') as fd
:
246 obj
= cPickle
.load(fd
)
248 self
._is
_snapshot
= True
251 def dump(self
, filename
):
252 """Creates an snapshot of this object.
254 Generated file is binary. For a textual representation of the
255 data, see `DataStore.dump_as_text`.
258 - `filename`: String, path of the file to create/overwrite.
260 with
open(filename
, 'wb', 0) as fd
:
261 cPickle
.dump(self
._store
, fd
, cPickle
.HIGHEST_PROTOCOL
)
263 os
.fsync(fd
.fileno())
266 """Creates a snapshot of this objects and returns it as an object."""
267 return cPickle
.dumps(self
._store
)
269 def dump_as_text(self
, filename
):
270 """Creates a human-readable snapshot of this object.
272 The file created by this method cannot be loaded again. To
273 create an snapshot for data persistency, see `DataStore.dump`.
276 - `filename`: String, path of the file to create/overwrite.
278 with
open(filename
, 'w') as fd
:
279 for groupname
, varss
in self
._store
.iteritems():
280 for varname
, entry
in varss
.iteritems():
281 points
= (str(x
.as_tuple()) for x
in entry
.get_all())
282 fd
.write("%s@%s: %s\n" % (varname
, groupname
,
285 os
.fsync(fd
.fileno())
287 def is_snapshot(self
):
288 return self
._is
_snapshot
290 def _update_vars(self
, force
=False):
291 """Updates internal _vars cache."""
292 if not self
._vars
or force
:
293 self
._vars
.update(*[v
.keys() for v
in self
._store
.itervalues()])