App Engine Python SDK version 1.9.3
[gae.git] / python / google / appengine / ext / ndb / eventloop.py
blobe7878734e9316617c23b41d0043f5c20762707a8
1 """An event loop.
3 This event loop should handle both asynchronous App Engine RPC objects
4 (specifically urlfetch, memcache and datastore RPC objects) and arbitrary
5 callback functions with an optional time delay.
7 Normally, event loops are singleton objects, though there is no
8 enforcement of this requirement.
10 The API here is inspired by Monocle.
11 """
13 import collections
14 import logging
15 import os
16 import time
18 from .google_imports import apiproxy_rpc
19 from .google_imports import datastore_rpc
21 from . import utils
23 __all__ = ['EventLoop',
24 'add_idle', 'queue_call', 'queue_rpc',
25 'get_event_loop',
26 'run', 'run0', 'run1',
29 _logging_debug = utils.logging_debug
31 _IDLE = apiproxy_rpc.RPC.IDLE
32 _RUNNING = apiproxy_rpc.RPC.RUNNING
33 _FINISHING = apiproxy_rpc.RPC.FINISHING
36 class EventLoop(object):
37 """An event loop."""
39 def __init__(self):
40 """Constructor.
42 Fields:
43 current: a FIFO list of (callback, args, kwds). These callbacks
44 run immediately when the eventloop runs.
45 idlers: a FIFO list of (callback, args, kwds). Thes callbacks
46 run only when no other RPCs need to be fired first.
47 For example, AutoBatcher uses idler to fire a batch RPC even before
48 the batch is full.
49 queue: a sorted list of (absolute time in sec, callback, args, kwds),
50 sorted by time. These callbacks run only after the said time.
51 rpcs: a map from rpc to (callback, args, kwds). Callback is called
52 when the rpc finishes.
53 """
54 self.current = collections.deque()
55 self.idlers = collections.deque()
56 self.inactive = 0 # How many idlers in a row were no-ops
57 self.queue = []
58 self.rpcs = {}
60 def clear(self):
61 """Remove all pending events without running any."""
62 while self.current or self.idlers or self.queue or self.rpcs:
63 current = self.current
64 idlers = self.idlers
65 queue = self.queue
66 rpcs = self.rpcs
67 _logging_debug('Clearing stale EventLoop instance...')
68 if current:
69 _logging_debug(' current = %s', current)
70 if idlers:
71 _logging_debug(' idlers = %s', idlers)
72 if queue:
73 _logging_debug(' queue = %s', queue)
74 if rpcs:
75 _logging_debug(' rpcs = %s', rpcs)
76 self.__init__()
77 current.clear()
78 idlers.clear()
79 queue[:] = []
80 rpcs.clear()
81 _logging_debug('Cleared')
83 def insort_event_right(self, event, lo=0, hi=None):
84 """Insert event in queue, and keep it sorted assuming queue is sorted.
86 If event is already in queue, insert it to the right of the rightmost
87 event (to keep FIFO order).
89 Optional args lo (default 0) and hi (default len(a)) bound the
90 slice of a to be searched.
92 Args:
93 event: a (time in sec since unix epoch, callback, args, kwds) tuple.
94 """
96 if lo < 0:
97 raise ValueError('lo must be non-negative')
98 if hi is None:
99 hi = len(self.queue)
100 while lo < hi:
101 mid = (lo + hi) // 2
102 if event[0] < self.queue[mid][0]: hi = mid
103 else: lo = mid + 1
104 self.queue.insert(lo, event)
106 def queue_call(self, delay, callback, *args, **kwds):
107 """Schedule a function call at a specific time in the future."""
108 if delay is None:
109 self.current.append((callback, args, kwds))
110 return
111 if delay < 1e9:
112 when = delay + time.time()
113 else:
114 # Times over a billion seconds are assumed to be absolute.
115 when = delay
116 self.insort_event_right((when, callback, args, kwds))
118 def queue_rpc(self, rpc, callback=None, *args, **kwds):
119 """Schedule an RPC with an optional callback.
121 The caller must have previously sent the call to the service.
122 The optional callback is called with the remaining arguments.
124 NOTE: If the rpc is a MultiRpc, the callback will be called once
125 for each sub-RPC. TODO: Is this a good idea?
127 if rpc is None:
128 return
129 if rpc.state not in (_RUNNING, _FINISHING):
130 raise RuntimeError('rpc must be sent to service before queueing')
131 if isinstance(rpc, datastore_rpc.MultiRpc):
132 rpcs = rpc.rpcs
133 if len(rpcs) > 1:
134 # Don't call the callback until all sub-rpcs have completed.
135 rpc.__done = False
136 def help_multi_rpc_along(r=rpc, c=callback, a=args, k=kwds):
137 if r.state == _FINISHING and not r.__done:
138 r.__done = True
139 c(*a, **k)
140 # TODO: And again, what about exceptions?
141 callback = help_multi_rpc_along
142 args = ()
143 kwds = {}
144 else:
145 rpcs = [rpc]
146 for rpc in rpcs:
147 self.rpcs[rpc] = (callback, args, kwds)
149 def add_idle(self, callback, *args, **kwds):
150 """Add an idle callback.
152 An idle callback can return True, False or None. These mean:
154 - None: remove the callback (don't reschedule)
155 - False: the callback did no work; reschedule later
156 - True: the callback did some work; reschedule soon
158 If the callback raises an exception, the traceback is logged and
159 the callback is removed.
161 self.idlers.append((callback, args, kwds))
163 def run_idle(self):
164 """Run one of the idle callbacks.
166 Returns:
167 True if one was called, False if no idle callback was called.
169 if not self.idlers or self.inactive >= len(self.idlers):
170 return False
171 idler = self.idlers.popleft()
172 callback, args, kwds = idler
173 _logging_debug('idler: %s', callback.__name__)
174 res = callback(*args, **kwds)
175 # See add_idle() for the meaning of the callback return value.
176 if res is not None:
177 if res:
178 self.inactive = 0
179 else:
180 self.inactive += 1
181 self.idlers.append(idler)
182 else:
183 _logging_debug('idler %s removed', callback.__name__)
184 return True
186 def run0(self):
187 """Run one item (a callback or an RPC wait_any).
189 Returns:
190 A time to sleep if something happened (may be 0);
191 None if all queues are empty.
193 if self.current:
194 self.inactive = 0
195 callback, args, kwds = self.current.popleft()
196 _logging_debug('nowevent: %s', callback.__name__)
197 callback(*args, **kwds)
198 return 0
199 if self.run_idle():
200 return 0
201 delay = None
202 if self.queue:
203 delay = self.queue[0][0] - time.time()
204 if delay <= 0:
205 self.inactive = 0
206 _, callback, args, kwds = self.queue.pop(0)
207 _logging_debug('event: %s', callback.__name__)
208 callback(*args, **kwds)
209 # TODO: What if it raises an exception?
210 return 0
211 if self.rpcs:
212 self.inactive = 0
213 rpc = datastore_rpc.MultiRpc.wait_any(self.rpcs)
214 if rpc is not None:
215 _logging_debug('rpc: %s.%s', rpc.service, rpc.method)
216 # Yes, wait_any() may return None even for a non-empty argument.
217 # But no, it won't ever return an RPC not in its argument.
218 if rpc not in self.rpcs:
219 raise RuntimeError('rpc %r was not given to wait_any as a choice %r' %
220 (rpc, self.rpcs))
221 callback, args, kwds = self.rpcs[rpc]
222 del self.rpcs[rpc]
223 if callback is not None:
224 callback(*args, **kwds)
225 # TODO: Again, what about exceptions?
226 return 0
227 return delay
229 def run1(self):
230 """Run one item (a callback or an RPC wait_any) or sleep.
232 Returns:
233 True if something happened; False if all queues are empty.
235 delay = self.run0()
236 if delay is None:
237 return False
238 if delay > 0:
239 time.sleep(delay)
240 return True
242 def run(self):
243 """Run until there's nothing left to do."""
244 # TODO: A way to stop running before the queue is empty.
245 self.inactive = 0
246 while True:
247 if not self.run1():
248 break
251 class _State(utils.threading_local):
252 event_loop = None
255 _EVENT_LOOP_KEY = '__EVENT_LOOP__'
257 _state = _State()
260 def get_event_loop():
261 """Return a EventLoop instance.
263 A new instance is created for each new HTTP request. We determine
264 that we're in a new request by inspecting os.environ, which is reset
265 at the start of each request. Also, each thread gets its own loop.
267 ev = _state.event_loop
268 if not os.getenv(_EVENT_LOOP_KEY) and ev is not None:
269 ev.clear()
270 _state.event_loop = None
271 ev = None
272 if ev is None:
273 ev = EventLoop()
274 _state.event_loop = ev
275 os.environ[_EVENT_LOOP_KEY] = '1'
276 return ev
279 def queue_call(*args, **kwds):
280 ev = get_event_loop()
281 ev.queue_call(*args, **kwds)
284 def queue_rpc(rpc, callback=None, *args, **kwds):
285 ev = get_event_loop()
286 ev.queue_rpc(rpc, callback, *args, **kwds)
289 def add_idle(callback, *args, **kwds):
290 ev = get_event_loop()
291 ev.add_idle(callback, *args, **kwds)
294 def run():
295 ev = get_event_loop()
296 ev.run()
299 def run1():
300 ev = get_event_loop()
301 return ev.run1()
304 def run0():
305 ev = get_event_loop()
306 return ev.run0()