3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 """Container of APIProxy stubs for more convenient unittesting.
24 Classes/variables/functions defined here:
25 APIProxyStubMap: container of APIProxy stubs.
26 apiproxy: global instance of an APIProxyStubMap.
27 MakeSyncCall: APIProxy entry point.
28 UserRPC: User-visible class wrapping asynchronous RPCs.
44 from google
.appengine
.api
import apiproxy_rpc
45 from google
.appengine
.runtime
import apiproxy_errors
48 def CreateRPC(service
, stubmap
=None):
49 """Creates a RPC instance for the given service.
51 The instance is suitable for talking to remote services.
52 Each RPC instance can be used only once, and should not be reused.
55 service: string representing which service to call.
56 stubmap: optional APIProxyStubMap instance, for dependency injection.
62 AssertionError or RuntimeError if the stub for service doesn't supply a
67 stub
= stubmap
.GetStub(service
)
68 assert stub
, 'No api proxy found for service "%s"' % service
69 assert hasattr(stub
, 'CreateRPC'), (('The service "%s" doesn\'t have ' +
70 'a CreateRPC method.') % service
)
71 return stub
.CreateRPC()
74 def MakeSyncCall(service
, call
, request
, response
, stubmap
=None):
75 """The APIProxy entry point for a synchronous API call.
78 service: string representing which service to call
79 call: string representing which function to call
80 request: protocol buffer for the request
81 response: protocol buffer for the response
82 stubmap: optional APIProxyStubMap instance, for dependency injection.
85 Response protocol buffer or None. Some implementations may return
86 a response protocol buffer instead of modifying 'response'.
87 Caller must use returned value in such cases. If 'response' is modified
91 apiproxy_errors.Error or a subclass.
95 return stubmap
.MakeSyncCall(service
, call
, request
, response
)
98 class ListOfHooks(object):
99 """An ordered collection of hooks for a particular API call.
101 A hook is a function that has exactly the same signature as
102 a service stub. It will be called before or after an api hook is
103 executed, depending on whether this list is for precall of postcall hooks.
104 Hooks can be used for debugging purposes (check certain
105 pre- or postconditions on api calls) or to apply patches to protocol
106 buffers before/after a call gets submitted.
116 self
.__unique
_keys
= set()
119 """Returns the amount of elements in the collection."""
120 return self
.__content
.__len
__()
122 def __Insert(self
, index
, key
, function
, service
=None):
123 """Appends a hook at a certain position in the list.
126 index: the index of where to insert the function
127 key: a unique key (within the module) for this particular function.
128 If something from the same module with the same key is already
129 registered, nothing will be added.
130 function: the hook to be added.
131 service: optional argument that restricts the hook to a particular api
134 True if the collection was modified.
136 unique_key
= (key
, inspect
.getmodule(function
))
137 if unique_key
in self
.__unique
_keys
:
139 num_args
= len(inspect
.getargspec(function
)[0])
140 if (inspect
.ismethod(function
)):
142 self
.__content
.insert(index
, (key
, function
, service
, num_args
))
143 self
.__unique
_keys
.add(unique_key
)
146 def Append(self
, key
, function
, service
=None):
147 """Appends a hook at the end of the list.
150 key: a unique key (within the module) for this particular function.
151 If something from the same module with the same key is already
152 registered, nothing will be added.
153 function: the hook to be added.
154 service: optional argument that restricts the hook to a particular api
157 True if the collection was modified.
159 return self
.__Insert
(len(self
), key
, function
, service
)
161 def Push(self
, key
, function
, service
=None):
162 """Inserts a hook at the beginning of the list.
165 key: a unique key (within the module) for this particular function.
166 If something from the same module with the same key is already
167 registered, nothing will be added.
168 function: the hook to be added.
169 service: optional argument that restricts the hook to a particular api
172 True if the collection was modified.
174 return self
.__Insert
(0, key
, function
, service
)
177 """Removes all hooks from the list (useful for unit tests)."""
179 self
.__unique
_keys
= set()
181 def Call(self
, service
, call
, request
, response
, rpc
=None, error
=None):
182 """Invokes all hooks in this collection.
184 NOTE: For backwards compatibility, if error is not None, hooks
185 with 4 or 5 arguments are *not* called. This situation
186 (error=None) only occurs when the RPC request raised an exception;
187 in the past no hooks would be called at all in that case.
190 service: string representing which service to call
191 call: string representing which function to call
192 request: protocol buffer for the request
193 response: protocol buffer for the response
194 rpc: optional RPC used to make this call
195 error: optional Exception instance to be passed as 6th argument
197 for key
, function
, srv
, num_args
in self
.__content
:
198 if srv
is None or srv
== service
:
200 function(service
, call
, request
, response
, rpc
, error
)
201 elif error
is not None:
206 function(service
, call
, request
, response
, rpc
)
208 function(service
, call
, request
, response
)
211 class APIProxyStubMap(object):
212 """Container of APIProxy stubs for more convenient unittesting.
214 Stubs may be either trivial implementations of APIProxy services (e.g.
215 DatastoreFileStub, UserServiceStub) or "real" implementations.
217 For unittests, we may want to mix and match real and trivial implementations
218 of services in order to better focus testing on individual service
219 implementations. To achieve this, we allow the client to attach stubs to
220 service names, as well as define a default stub to be used if no specific
221 matching stub is identified.
227 def __init__(self
, default_stub
=None):
231 default_stub: optional stub
233 'default_stub' will be used whenever no specific matching stub is found.
236 self
.__default
_stub
= default_stub
237 self
.__precall
_hooks
= ListOfHooks()
238 self
.__postcall
_hooks
= ListOfHooks()
240 def GetPreCallHooks(self
):
241 """Gets a collection for all precall hooks."""
242 return self
.__precall
_hooks
244 def GetPostCallHooks(self
):
245 """Gets a collection for all precall hooks."""
246 return self
.__postcall
_hooks
248 def ReplaceStub(self
, service
, stub
):
249 """Replace the existing stub for the specified service with a new one.
251 NOTE: This is a risky operation; external callers should use this with
258 self
.__stub
_map
[service
] = stub
266 if service
== 'datastore':
267 self
.RegisterStub('datastore_v3', stub
)
269 def RegisterStub(self
, service
, stub
):
270 """Register the provided stub for the specified service.
276 assert not self
.__stub
_map
.has_key(service
), repr(service
)
277 self
.ReplaceStub(service
, stub
)
279 def GetStub(self
, service
):
280 """Retrieve the stub registered for the specified service.
288 Returns the stub registered for 'service', and returns the default stub
289 if no such stub is found.
291 return self
.__stub
_map
.get(service
, self
.__default
_stub
)
293 def _CopyStubMap(self
):
294 """Get a copy of the stub map. For testing only.
297 Get a shallow copy of the stub map.
299 return dict(self
.__stub
_map
)
301 def MakeSyncCall(self
, service
, call
, request
, response
):
302 """The APIProxy entry point.
305 service: string representing which service to call
306 call: string representing which function to call
307 request: protocol buffer for the request
308 response: protocol buffer for the response
311 Response protocol buffer or None. Some implementations may return
312 a response protocol buffer instead of modifying 'response'.
313 Caller must use returned value in such cases. If 'response' is modified
317 apiproxy_errors.Error or a subclass.
321 stub
= self
.GetStub(service
)
322 assert stub
, 'No api proxy found for service "%s"' % service
323 if hasattr(stub
, 'CreateRPC'):
324 rpc
= stub
.CreateRPC()
325 self
.__precall
_hooks
.Call(service
, call
, request
, response
, rpc
)
327 rpc
.MakeCall(service
, call
, request
, response
)
330 except Exception, err
:
331 self
.__postcall
_hooks
.Call(service
, call
, request
, response
, rpc
, err
)
334 self
.__postcall
_hooks
.Call(service
, call
, request
, response
, rpc
)
336 self
.__precall
_hooks
.Call(service
, call
, request
, response
)
338 returned_response
= stub
.MakeSyncCall(service
, call
, request
, response
)
339 except Exception, err
:
340 self
.__postcall
_hooks
.Call(service
, call
, request
, response
, None, err
)
343 self
.__postcall
_hooks
.Call(service
, call
, request
,
344 returned_response
or response
)
345 return returned_response
347 def CancelApiCalls(self
):
348 if self
.__default
_stub
:
349 self
.__default
_stub
.CancelApiCalls()
352 class UserRPC(object):
353 """Wrapper class for asynchronous RPC.
355 Simplest low-level usage pattern:
357 rpc = UserRPC('service', [deadline], [callback])
358 rpc.make_call('method', request, response)
365 However, a service module normally provides a wrapper so that the
366 typical usage pattern becomes more like this:
368 from google.appengine.api import service
369 rpc = service.create_rpc([deadline], [callback])
370 service.make_method_call(rpc, [service-specific-args])
375 result = rpc.get_result()
377 The service.make_method_call() function sets a service- and method-
378 specific hook function that is called by rpc.get_result() with the
379 rpc object as its first argument, and service-specific value as its
380 second argument. The hook function should call rpc.check_success()
381 and then extract the user-level result from the rpc.result
382 protobuffer. Additional arguments may be passed from
383 make_method_call() to the get_result hook via the second argument.
385 Also note wait_any() and wait_all(), which wait for multiple RPCs.
389 __get_result_hook
= None
391 __postcall_hooks_called
= False
392 __must_call_user_callback
= False
394 class MyLocal(threading
.local
):
395 """Class to hold per-thread class level attributes."""
397 may_interrupt_wait
= False
401 def __init__(self
, service
, deadline
=None, callback
=None, stubmap
=None):
405 service: The service name.
406 deadline: Optional deadline. Default depends on the implementation.
407 callback: Optional argument-less callback function.
408 stubmap: optional APIProxyStubMap instance, for dependency injection.
412 self
.__stubmap
= stubmap
413 self
.__service
= service
414 self
.__rpc
= CreateRPC(service
, stubmap
)
415 self
.__rpc
.deadline
= deadline
416 self
.__rpc
.callback
= self
.__internal
_callback
417 self
.callback
= callback
431 self
.__class
__.__local
.may_interrupt_wait
= False
433 def __internal_callback(self
):
434 """This is the callback set on the low-level RPC object.
436 It sets a flag on the current object indicating that the high-level
437 callback should now be called. If interrupts are enabled, it also
438 interrupts the current wait_any() call by raising an exception.
440 self
.__must
_call
_user
_callback
= True
441 self
.__rpc
.callback
= None
442 if self
.__class
__.__local
.may_interrupt_wait
and not self
.__rpc
.exception
:
451 raise apiproxy_errors
.InterruptedError(None, self
.__rpc
)
455 """Return the service name."""
456 return self
.__service
460 """Return the method name."""
465 """Return the deadline, if set explicitly (otherwise None)."""
466 return self
.__rpc
.deadline
470 """Return the request protocol buffer object."""
471 return self
.__rpc
.request
475 """Return the response protocol buffer object."""
476 return self
.__rpc
.response
480 """Return the RPC state.
482 Possible values are attributes of apiproxy_rpc.RPC: IDLE, RUNNING,
485 return self
.__rpc
.state
488 def get_result_hook(self
):
489 """Return the get-result hook function."""
490 return self
.__get
_result
_hook
494 """Return the user data for the hook function."""
495 return self
.__user
_data
497 def make_call(self
, method
, request
, response
,
498 get_result_hook
=None, user_data
=None):
502 method: The method name.
503 request: The request protocol buffer.
504 response: The response protocol buffer.
505 get_result_hook: Optional get-result hook function. If not None,
506 this must be a function with exactly one argument, the RPC
507 object (self). Its return value is returned from get_result().
508 user_data: Optional additional arbitrary data for the get-result
509 hook function. This can be accessed as rpc.user_data. The
510 type of this value is up to the service module.
512 This function may only be called once per RPC object. It sends
513 the request to the remote server, but does not wait for a
514 response. This allows concurrent execution of the remote call and
515 further local processing (e.g., making additional remote calls).
517 Before the call is initiated, the precall hooks are called.
520 assert self
.__rpc
.state
== apiproxy_rpc
.RPC
.IDLE
, repr(self
.state
)
522 self
.__method
= method
524 self
.__get
_result
_hook
= get_result_hook
525 self
.__user
_data
= user_data
527 self
.__stubmap
.GetPreCallHooks().Call(
528 self
.__service
, method
, request
, response
, self
.__rpc
)
530 self
.__rpc
.MakeCall(self
.__service
, method
, request
, response
)
533 """Wait for the call to complete, and call callback if needed.
535 This and wait_any()/wait_all() are the only time callback
536 functions may be called. (However, note that check_success() and
537 get_result() call wait().) Waiting for one RPC will not cause
538 callbacks for other RPCs to be called. Callback functions may
539 call check_success() and get_result().
541 Callbacks are called without arguments; if a callback needs access
542 to the RPC object a Python nested function (a.k.a. closure) or a
543 bound may be used. To facilitate this, the callback may be
544 assigned after the RPC object is created (but before make_call()
547 Note: don't confuse callbacks with get-result hooks or precall
551 assert self
.__rpc
.state
!= apiproxy_rpc
.RPC
.IDLE
, repr(self
.state
)
553 if self
.__rpc
.state
== apiproxy_rpc
.RPC
.RUNNING
:
556 assert self
.__rpc
.state
== apiproxy_rpc
.RPC
.FINISHING
, repr(self
.state
)
557 self
.__call
_user
_callback
()
559 def __call_user_callback(self
):
560 """Call the high-level callback, if requested."""
561 if self
.__must
_call
_user
_callback
:
562 self
.__must
_call
_user
_callback
= False
563 if self
.callback
is not None:
566 def check_success(self
):
567 """Check for success of the RPC, possibly raising an exception.
569 This function should be called at least once per RPC. If wait()
570 hasn't been called yet, it is called first. If the RPC caused
571 an exceptional condition, an exception will be raised here.
572 The first time check_success() is called, the postcall hooks
579 self
.__rpc
.CheckSuccess()
580 except Exception, err
:
582 if not self
.__postcall
_hooks
_called
:
583 self
.__postcall
_hooks
_called
= True
584 self
.__stubmap
.GetPostCallHooks().Call(self
.__service
, self
.__method
,
585 self
.request
, self
.response
,
590 if not self
.__postcall
_hooks
_called
:
591 self
.__postcall
_hooks
_called
= True
592 self
.__stubmap
.GetPostCallHooks().Call(self
.__service
, self
.__method
,
593 self
.request
, self
.response
,
596 def get_result(self
):
597 """Get the result of the RPC, or possibly raise an exception.
599 This implies a call to check_success(). If a get-result hook was
600 passed to make_call(), that hook is responsible for calling
601 check_success(), and the return value of the hook is returned.
602 Otherwise, check_success() is called directly and None is
609 if self
.__get
_result
_hook
is None:
613 return self
.__get
_result
_hook
(self
)
616 def __check_one(cls
, rpcs
):
617 """Check the list of RPCs for one that is finished, or one that is running.
620 rpcs: Iterable collection of UserRPC instances.
623 A pair (finished, running), as follows:
624 (UserRPC, None) indicating the first RPC found that is finished;
625 (None, UserRPC) indicating the first RPC found that is running;
626 (None, None) indicating no RPCs are finished or running.
630 assert isinstance(rpc
, cls
), repr(rpc
)
631 state
= rpc
.__rpc
.state
632 if state
== apiproxy_rpc
.RPC
.FINISHING
:
633 rpc
.__call
_user
_callback
()
635 assert state
!= apiproxy_rpc
.RPC
.IDLE
, repr(rpc
)
639 def wait_any(cls
, rpcs
):
640 """Wait until an RPC is finished.
643 rpcs: Iterable collection of UserRPC instances.
646 A UserRPC instance, indicating the first RPC among the given
647 RPCs that finished; or None, indicating that either an RPC not
648 among the given RPCs finished in the mean time, or the iterable
653 (1) Repeatedly calling wait_any() with the same arguments will not
654 make progress; it will keep returning the same RPC (the one
655 that finished first). The callback, however, will only be
656 called the first time the RPC finishes (which may be here or
657 in the wait() method).
659 (2) It may return before any of the given RPCs finishes, if
660 another pending RPC exists that is not included in the rpcs
661 argument. In this case the other RPC's callback will *not*
662 be called. The motivation for this feature is that wait_any()
663 may be used as a low-level building block for a variety of
664 high-level constructs, some of which prefer to block for the
665 minimal amount of time without busy-waiting.
667 assert iter(rpcs
) is not rpcs
, 'rpcs must be a collection, not an iterator'
668 finished
, running
= cls
.__check
_one
(rpcs
)
669 if finished
is not None:
674 cls
.__local
.may_interrupt_wait
= True
677 except apiproxy_errors
.InterruptedError
, err
:
683 err
.rpc
._exception
= None
684 err
.rpc
._traceback
= None
686 cls
.__local
.may_interrupt_wait
= False
687 finished
, runnning
= cls
.__check
_one
(rpcs
)
691 def wait_all(cls
, rpcs
):
692 """Wait until all given RPCs are finished.
694 This is a thin wrapper around wait_any() that loops until all
695 given RPCs have finished.
698 rpcs: Iterable collection of UserRPC instances.
705 finished
= cls
.wait_any(rpcs
)
706 if finished
is not None:
707 rpcs
.remove(finished
)
712 def GetDefaultAPIProxy():
720 runtime
= __import__('google.appengine.runtime', globals(), locals(),
722 return APIProxyStubMap(runtime
.apiproxy
)
723 except (AttributeError, ImportError):
724 return APIProxyStubMap()
729 apiproxy
= GetDefaultAPIProxy()