Remove obsolete test code from setup.py
[cds-indico.git] / indico / util / struct / queue.py
blob268e5715a60b0d356749593bf792e571e5c55190
1 # This file is part of Indico.
2 # Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN).
4 # Indico is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License as
6 # published by the Free Software Foundation; either version 3 of the
7 # License, or (at your option) any later version.
9 # Indico is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with Indico; if not, see <http://www.gnu.org/licenses/>.
17 """
18 Queue-style data structures
19 """
21 from BTrees.IOBTree import IOBTree
22 from BTrees.OOBTree import OOTreeSet
23 from BTrees.Length import Length
24 from persistent import Persistent
26 class DuplicateElementException(Exception):
27 """
28 Tried to insert the same element twice in the queue
29 """
32 class PersistentWaitingQueue(Persistent):
33 """
34 A Waiting queue, implemented using a map structure (BTree...)
35 It is persistent, but very vulnerable to conflicts. This is due to the
36 fact that sets are used as container, and there can happen a situation
37 where two different sets are assigned to the same timestamp. This will
38 for sure result in conflict.
40 That said, the commits of objects like these have to be carefully
41 synchronized. See `indico.modules.scheduler.controllers` for more info
42 (particularly the way we use the 'spool').
43 """
45 def __init__(self):
46 super(PersistentWaitingQueue, self).__init__()
47 self._reset()
49 def _reset(self):
50 # this counter keeps the number of elements
51 self._elem_counter = Length(0)
52 self._container = IOBTree()
54 def _gc_bin(self, t):
55 """
56 'garbage-collect' bins
57 """
58 if len(self._container[t]) == 0:
59 del self._container[t]
61 def _check_gc_consistency(self):
62 """
63 'check that there are no empty bins'
64 """
65 for t in self._container:
66 if len(self._container[t]) == 0:
67 return False
69 return True
71 def enqueue(self, t, obj):
72 """
73 Add an element to the queue
74 """
76 if t not in self._container:
77 self._container[t] = OOTreeSet()
79 if obj in self._container[t]:
80 raise DuplicateElementException(obj)
82 self._container[t].add(obj)
83 self._elem_counter.change(1)
85 def dequeue(self, t, obj):
86 """
87 Remove an element from the queue
88 """
89 self._container[t].remove(obj)
90 self._gc_bin(t)
91 self._elem_counter.change(-1)
93 def _next_timestamp(self):
94 """
95 Return the next 'priority' to be served
96 """
97 i = iter(self._container)
99 try:
100 t = i.next()
101 return t
102 except StopIteration:
103 return None
105 def peek(self):
107 Return the next element
109 t = self._next_timestamp()
110 if t:
111 # just to be sure
112 assert(len(self._container[t]) != 0)
114 # find the next element
115 i = iter(self._container[t])
116 # store it
117 elem = i.next()
119 # return the element
120 return t, elem
121 else:
122 return None
124 def pop(self):
126 Remove and return the next set of elements to be processed
128 pair = self.peek()
129 if pair:
130 self.dequeue(*pair)
132 # return the element
133 return pair
134 else:
135 return None
137 def nbins(self):
139 Return the number of 'bins' (map entries) currently used
141 # get 'real' len()
142 return len(self._container)
144 def __len__(self):
145 return self._elem_counter()
148 def __getitem__(self, param):
149 return self._container.__getitem__(param)
151 def __iter__(self):
153 # tree iterator
154 for tstamp in iter(self._container):
155 cur_set = self._container[tstamp]
156 try:
157 # set iterator
158 for elem in cur_set:
159 yield tstamp, elem
160 except StopIteration:
161 pass