1 # This file is part of Indico.
2 # Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN).
4 # Indico is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License as
6 # published by the Free Software Foundation; either version 3 of the
7 # License, or (at your option) any later version.
9 # Indico is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with Indico; if not, see <http://www.gnu.org/licenses/>.
18 Queue-style data structures
21 from BTrees
.IOBTree
import IOBTree
22 from BTrees
.OOBTree
import OOTreeSet
23 from BTrees
.Length
import Length
24 from persistent
import Persistent
26 class DuplicateElementException(Exception):
28 Tried to insert the same element twice in the queue
32 class PersistentWaitingQueue(Persistent
):
34 A Waiting queue, implemented using a map structure (BTree...)
35 It is persistent, but very vulnerable to conflicts. This is due to the
36 fact that sets are used as container, and there can happen a situation
37 where two different sets are assigned to the same timestamp. This will
38 for sure result in conflict.
40 That said, the commits of objects like these have to be carefully
41 synchronized. See `indico.modules.scheduler.controllers` for more info
42 (particularly the way we use the 'spool').
46 super(PersistentWaitingQueue
, self
).__init
__()
50 # this counter keeps the number of elements
51 self
._elem
_counter
= Length(0)
52 self
._container
= IOBTree()
56 'garbage-collect' bins
58 if len(self
._container
[t
]) == 0:
59 del self
._container
[t
]
61 def _check_gc_consistency(self
):
63 'check that there are no empty bins'
65 for t
in self
._container
:
66 if len(self
._container
[t
]) == 0:
71 def enqueue(self
, t
, obj
):
73 Add an element to the queue
76 if t
not in self
._container
:
77 self
._container
[t
] = OOTreeSet()
79 if obj
in self
._container
[t
]:
80 raise DuplicateElementException(obj
)
82 self
._container
[t
].add(obj
)
83 self
._elem
_counter
.change(1)
85 def dequeue(self
, t
, obj
):
87 Remove an element from the queue
89 self
._container
[t
].remove(obj
)
91 self
._elem
_counter
.change(-1)
93 def _next_timestamp(self
):
95 Return the next 'priority' to be served
97 i
= iter(self
._container
)
102 except StopIteration:
107 Return the next element
109 t
= self
._next
_timestamp
()
112 assert(len(self
._container
[t
]) != 0)
114 # find the next element
115 i
= iter(self
._container
[t
])
126 Remove and return the next set of elements to be processed
139 Return the number of 'bins' (map entries) currently used
142 return len(self
._container
)
145 return self
._elem
_counter
()
148 def __getitem__(self
, param
):
149 return self
._container
.__getitem
__(param
)
154 for tstamp
in iter(self
._container
):
155 cur_set
= self
._container
[tstamp
]
160 except StopIteration: