3 This event loop should handle both asynchronous App Engine RPC objects
4 (specifically urlfetch, memcache and datastore RPC objects) and arbitrary
5 callback functions with an optional time delay.
7 Normally, event loops are singleton objects, though there is no
8 enforcement of this requirement.
10 The API here is inspired by Monocle.
18 from .google_imports
import apiproxy_rpc
19 from .google_imports
import datastore_rpc
23 __all__
= ['EventLoop',
24 'add_idle', 'queue_call', 'queue_rpc',
26 'run', 'run0', 'run1',
29 _logging_debug
= utils
.logging_debug
31 _IDLE
= apiproxy_rpc
.RPC
.IDLE
32 _RUNNING
= apiproxy_rpc
.RPC
.RUNNING
33 _FINISHING
= apiproxy_rpc
.RPC
.FINISHING
36 class EventLoop(object):
43 current: a FIFO list of (callback, args, kwds). These callbacks
44 run immediately when the eventloop runs.
45 idlers: a FIFO list of (callback, args, kwds). Thes callbacks
46 run only when no other RPCs need to be fired first.
47 For example, AutoBatcher uses idler to fire a batch RPC even before
49 queue: a sorted list of (absolute time in sec, callback, args, kwds),
50 sorted by time. These callbacks run only after the said time.
51 rpcs: a map from rpc to (callback, args, kwds). Callback is called
52 when the rpc finishes.
54 self
.current
= collections
.deque()
55 self
.idlers
= collections
.deque()
56 self
.inactive
= 0 # How many idlers in a row were no-ops
61 """Remove all pending events without running any."""
62 while self
.current
or self
.idlers
or self
.queue
or self
.rpcs
:
63 current
= self
.current
67 _logging_debug('Clearing stale EventLoop instance...')
69 _logging_debug(' current = %s', current
)
71 _logging_debug(' idlers = %s', idlers
)
73 _logging_debug(' queue = %s', queue
)
75 _logging_debug(' rpcs = %s', rpcs
)
81 _logging_debug('Cleared')
83 def insort_event_right(self
, event
, lo
=0, hi
=None):
84 """Insert event in queue, and keep it sorted assuming queue is sorted.
86 If event is already in queue, insert it to the right of the rightmost
87 event (to keep FIFO order).
89 Optional args lo (default 0) and hi (default len(a)) bound the
90 slice of a to be searched.
93 event: a (time in sec since unix epoch, callback, args, kwds) tuple.
97 raise ValueError('lo must be non-negative')
102 if event
[0] < self
.queue
[mid
][0]: hi
= mid
104 self
.queue
.insert(lo
, event
)
106 def queue_call(self
, delay
, callback
, *args
, **kwds
):
107 """Schedule a function call at a specific time in the future."""
109 self
.current
.append((callback
, args
, kwds
))
112 when
= delay
+ time
.time()
114 # Times over a billion seconds are assumed to be absolute.
116 self
.insort_event_right((when
, callback
, args
, kwds
))
118 def queue_rpc(self
, rpc
, callback
=None, *args
, **kwds
):
119 """Schedule an RPC with an optional callback.
121 The caller must have previously sent the call to the service.
122 The optional callback is called with the remaining arguments.
124 NOTE: If the rpc is a MultiRpc, the callback will be called once
125 for each sub-RPC. TODO: Is this a good idea?
129 if rpc
.state
not in (_RUNNING
, _FINISHING
):
130 raise RuntimeError('rpc must be sent to service before queueing')
131 if isinstance(rpc
, datastore_rpc
.MultiRpc
):
134 # Don't call the callback until all sub-rpcs have completed.
136 def help_multi_rpc_along(r
=rpc
, c
=callback
, a
=args
, k
=kwds
):
137 if r
.state
== _FINISHING
and not r
.__done
:
140 # TODO: And again, what about exceptions?
141 callback
= help_multi_rpc_along
147 self
.rpcs
[rpc
] = (callback
, args
, kwds
)
149 def add_idle(self
, callback
, *args
, **kwds
):
150 """Add an idle callback.
152 An idle callback can return True, False or None. These mean:
154 - None: remove the callback (don't reschedule)
155 - False: the callback did no work; reschedule later
156 - True: the callback did some work; reschedule soon
158 If the callback raises an exception, the traceback is logged and
159 the callback is removed.
161 self
.idlers
.append((callback
, args
, kwds
))
164 """Run one of the idle callbacks.
167 True if one was called, False if no idle callback was called.
169 if not self
.idlers
or self
.inactive
>= len(self
.idlers
):
171 idler
= self
.idlers
.popleft()
172 callback
, args
, kwds
= idler
173 _logging_debug('idler: %s', callback
.__name
__)
174 res
= callback(*args
, **kwds
)
175 # See add_idle() for the meaning of the callback return value.
181 self
.idlers
.append(idler
)
183 _logging_debug('idler %s removed', callback
.__name
__)
187 """Run one item (a callback or an RPC wait_any).
190 A time to sleep if something happened (may be 0);
191 None if all queues are empty.
195 callback
, args
, kwds
= self
.current
.popleft()
196 _logging_debug('nowevent: %s', callback
.__name
__)
197 callback(*args
, **kwds
)
203 delay
= self
.queue
[0][0] - time
.time()
206 _
, callback
, args
, kwds
= self
.queue
.pop(0)
207 _logging_debug('event: %s', callback
.__name
__)
208 callback(*args
, **kwds
)
209 # TODO: What if it raises an exception?
213 rpc
= datastore_rpc
.MultiRpc
.wait_any(self
.rpcs
)
215 _logging_debug('rpc: %s.%s', rpc
.service
, rpc
.method
)
216 # Yes, wait_any() may return None even for a non-empty argument.
217 # But no, it won't ever return an RPC not in its argument.
218 if rpc
not in self
.rpcs
:
219 raise RuntimeError('rpc %r was not given to wait_any as a choice %r' %
221 callback
, args
, kwds
= self
.rpcs
[rpc
]
223 if callback
is not None:
224 callback(*args
, **kwds
)
225 # TODO: Again, what about exceptions?
230 """Run one item (a callback or an RPC wait_any) or sleep.
233 True if something happened; False if all queues are empty.
243 """Run until there's nothing left to do."""
244 # TODO: A way to stop running before the queue is empty.
251 class _State(utils
.threading_local
):
255 _EVENT_LOOP_KEY
= '__EVENT_LOOP__'
260 def get_event_loop():
261 """Return a EventLoop instance.
263 A new instance is created for each new HTTP request. We determine
264 that we're in a new request by inspecting os.environ, which is reset
265 at the start of each request. Also, each thread gets its own loop.
267 ev
= _state
.event_loop
268 if not os
.getenv(_EVENT_LOOP_KEY
) and ev
is not None:
270 _state
.event_loop
= None
274 _state
.event_loop
= ev
275 os
.environ
[_EVENT_LOOP_KEY
] = '1'
279 def queue_call(*args
, **kwds
):
280 ev
= get_event_loop()
281 ev
.queue_call(*args
, **kwds
)
284 def queue_rpc(rpc
, callback
=None, *args
, **kwds
):
285 ev
= get_event_loop()
286 ev
.queue_rpc(rpc
, callback
, *args
, **kwds
)
289 def add_idle(callback
, *args
, **kwds
):
290 ev
= get_event_loop()
291 ev
.add_idle(callback
, *args
, **kwds
)
295 ev
= get_event_loop()
300 ev
= get_event_loop()
305 ev
= get_event_loop()