1 """RPC Implemention, originally written for the Python Idle IDE
3 For security reasons, GvR requested that Idle's Python execution server process
4 connect to the Idle process, which listens for the connection. Since Idle has
5 has only one client per server, this was not a limitation.
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
15 | + -------------------+ |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
22 The RPCServer handler class is expected to provide register/unregister methods.
23 RPCHandler inherits the mix-in class SocketIO, which provides these methods.
25 See the Idle run.main() docstring for further information on how this was
36 import cPickle
as pickle
45 def unpickle_code(ms
):
46 co
= marshal
.loads(ms
)
47 assert isinstance(co
, types
.CodeType
)
51 assert isinstance(co
, types
.CodeType
)
52 ms
= marshal
.dumps(co
)
53 return unpickle_code
, (ms
,)
55 # XXX KBK 24Aug02 function pickling capability not used in Idle
56 # def unpickle_function(ms):
59 # def pickle_function(fn):
60 # assert isinstance(fn, type.FunctionType)
63 copy_reg
.pickle(types
.CodeType
, pickle_code
, unpickle_code
)
64 # copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
67 LOCALHOST
= '127.0.0.1'
69 class RPCServer(SocketServer
.TCPServer
):
71 def __init__(self
, addr
, handlerclass
=None):
72 if handlerclass
is None:
73 handlerclass
= RPCHandler
74 SocketServer
.TCPServer
.__init
__(self
, addr
, handlerclass
)
76 def server_bind(self
):
77 "Override TCPServer method, no bind() phase for connecting entity"
80 def server_activate(self
):
81 """Override TCPServer method, connect() instead of listen()
83 Due to the reversed connection, self.server_address is actually the
84 address of the Idle Client to which we are connecting.
87 self
.socket
.connect(self
.server_address
)
89 def get_request(self
):
90 "Override TCPServer method, return already connected socket"
91 return self
.socket
, self
.server_address
93 def handle_error(self
, request
, client_address
):
94 """Override TCPServer method
96 Error message goes to __stderr__. No error message if exiting
97 normally or socket raised EOF. Other exceptions not handled in
98 server code will cause os._exit.
107 print>>erf
, '\n' + '-'*40
108 print>>erf
, 'Unhandled server exception!'
109 print>>erf
, 'Thread: %s' % threading
.currentThread().getName()
110 print>>erf
, 'Client Address: ', client_address
111 print>>erf
, 'Request: ', repr(request
)
112 traceback
.print_exc(file=erf
)
113 print>>erf
, '\n*** Unrecoverable, server exiting!'
117 #----------------- end class RPCServer --------------------
120 request_queue
= Queue
.Queue(0)
121 response_queue
= Queue
.Queue(0)
124 class SocketIO(object):
128 def __init__(self
, sock
, objtable
=None, debugging
=None):
129 self
.sockthread
= threading
.currentThread()
130 if debugging
is not None:
131 self
.debugging
= debugging
134 objtable
= objecttable
135 self
.objtable
= objtable
146 "override for specific exit action"
149 def debug(self
, *args
):
150 if not self
.debugging
:
152 s
= self
.location
+ " " + str(threading
.currentThread().getName())
155 print>>sys
.__stderr
__, s
157 def register(self
, oid
, object):
158 self
.objtable
[oid
] = object
160 def unregister(self
, oid
):
162 del self
.objtable
[oid
]
166 def localcall(self
, seq
, request
):
167 self
.debug("localcall:", request
)
169 how
, (oid
, methodname
, args
, kwargs
) = request
171 return ("ERROR", "Bad request format")
172 if not self
.objtable
.has_key(oid
):
173 return ("ERROR", "Unknown object id: %r" % (oid
,))
174 obj
= self
.objtable
[oid
]
175 if methodname
== "__methods__":
177 _getmethods(obj
, methods
)
178 return ("OK", methods
)
179 if methodname
== "__attributes__":
181 _getattributes(obj
, attributes
)
182 return ("OK", attributes
)
183 if not hasattr(obj
, methodname
):
184 return ("ERROR", "Unsupported method name: %r" % (methodname
,))
185 method
= getattr(obj
, methodname
)
188 ret
= method(*args
, **kwargs
)
189 if isinstance(ret
, RemoteObject
):
193 request_queue
.put((seq
, (method
, args
, kwargs
)))
194 return("QUEUED", None)
196 return ("ERROR", "Unsupported message type: %s" % how
)
202 msg
= "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\
203 " Object: %s \n Method: %s \n Args: %s\n"
204 print>>sys
.__stderr
__, msg
% (oid
, method
, args
)
205 traceback
.print_exc(file=sys
.__stderr
__)
206 return ("EXCEPTION", None)
208 def remotecall(self
, oid
, methodname
, args
, kwargs
):
209 self
.debug("remotecall:asynccall: ", oid
, methodname
)
210 seq
= self
.asynccall(oid
, methodname
, args
, kwargs
)
211 return self
.asyncreturn(seq
)
213 def remotequeue(self
, oid
, methodname
, args
, kwargs
):
214 self
.debug("remotequeue:asyncqueue: ", oid
, methodname
)
215 seq
= self
.asyncqueue(oid
, methodname
, args
, kwargs
)
216 return self
.asyncreturn(seq
)
218 def asynccall(self
, oid
, methodname
, args
, kwargs
):
219 request
= ("CALL", (oid
, methodname
, args
, kwargs
))
221 if threading
.currentThread() != self
.sockthread
:
222 cvar
= threading
.Condition()
223 self
.cvars
[seq
] = cvar
224 self
.debug(("asynccall:%d:" % seq
), oid
, methodname
, args
, kwargs
)
225 self
.putmessage((seq
, request
))
228 def asyncqueue(self
, oid
, methodname
, args
, kwargs
):
229 request
= ("QUEUE", (oid
, methodname
, args
, kwargs
))
231 if threading
.currentThread() != self
.sockthread
:
232 cvar
= threading
.Condition()
233 self
.cvars
[seq
] = cvar
234 self
.debug(("asyncqueue:%d:" % seq
), oid
, methodname
, args
, kwargs
)
235 self
.putmessage((seq
, request
))
238 def asyncreturn(self
, seq
):
239 self
.debug("asyncreturn:%d:call getresponse(): " % seq
)
240 response
= self
.getresponse(seq
, wait
=0.05)
241 self
.debug(("asyncreturn:%d:response: " % seq
), response
)
242 return self
.decoderesponse(response
)
244 def decoderesponse(self
, response
):
250 if how
== "EXCEPTION":
251 self
.debug("decoderesponse: EXCEPTION")
254 self
.debug("decoderesponse: EOF")
255 self
.decode_interrupthook()
258 self
.debug("decoderesponse: Internal ERROR:", what
)
259 raise RuntimeError, what
260 raise SystemError, (how
, what
)
262 def decode_interrupthook(self
):
267 """Listen on socket until I/O not ready or EOF
269 pollresponse() will loop looking for seq number None, which
270 never comes, and exit on EOFError.
274 self
.getresponse(myseq
=None, wait
=0.05)
276 self
.debug("mainloop:return")
279 def getresponse(self
, myseq
, wait
):
280 response
= self
._getresponse
(myseq
, wait
)
281 if response
is not None:
284 response
= how
, self
._proxify
(what
)
287 def _proxify(self
, obj
):
288 if isinstance(obj
, RemoteProxy
):
289 return RPCProxy(self
, obj
.oid
)
290 if isinstance(obj
, types
.ListType
):
291 return map(self
._proxify
, obj
)
292 # XXX Check for other types -- not currently needed
295 def _getresponse(self
, myseq
, wait
):
296 self
.debug("_getresponse:myseq:", myseq
)
297 if threading
.currentThread() is self
.sockthread
:
298 # this thread does all reading of requests or responses
300 response
= self
.pollresponse(myseq
, wait
)
301 if response
is not None:
304 # wait for notification from socket handling thread
305 cvar
= self
.cvars
[myseq
]
307 while not self
.responses
.has_key(myseq
):
309 response
= self
.responses
[myseq
]
310 self
.debug("_getresponse:%s: thread woke up: response: %s" %
312 del self
.responses
[myseq
]
313 del self
.cvars
[myseq
]
318 self
.nextseq
= seq
= self
.nextseq
+ 2
321 def putmessage(self
, message
):
322 self
.debug("putmessage:%d:" % message
[0])
324 s
= pickle
.dumps(message
)
325 except pickle
.PicklingError
:
326 print >>sys
.__stderr
__, "Cannot pickle:", repr(message
)
328 s
= struct
.pack("<i", len(s
)) + s
331 r
, w
, x
= select
.select([], [self
.sock
], [])
332 n
= self
.sock
.send(s
[:BUFSIZE
])
333 except (AttributeError, TypeError):
334 raise IOError, "socket no longer exists"
342 bufstate
= 0 # meaning: 0 => reading count; 1 => reading data
344 def pollpacket(self
, wait
):
346 if len(self
.buffer) < self
.bufneed
:
347 r
, w
, x
= select
.select([self
.sock
.fileno()], [], [], wait
)
351 s
= self
.sock
.recv(BUFSIZE
)
358 return self
._stage
1()
361 if self
.bufstate
== 0 and len(self
.buffer) >= 4:
363 self
.buffer = self
.buffer[4:]
364 self
.bufneed
= struct
.unpack("<i", s
)[0]
368 if self
.bufstate
== 1 and len(self
.buffer) >= self
.bufneed
:
369 packet
= self
.buffer[:self
.bufneed
]
370 self
.buffer = self
.buffer[self
.bufneed
:]
375 def pollmessage(self
, wait
):
376 packet
= self
.pollpacket(wait
)
380 message
= pickle
.loads(packet
)
381 except pickle
.UnpicklingError
:
382 print >>sys
.__stderr
__, "-----------------------"
383 print >>sys
.__stderr
__, "cannot unpickle packet:", repr(packet
)
384 traceback
.print_stack(file=sys
.__stderr
__)
385 print >>sys
.__stderr
__, "-----------------------"
389 def pollresponse(self
, myseq
, wait
):
390 """Handle messages received on the socket.
392 Some messages received may be asynchronous 'call' or 'queue' requests,
393 and some may be responses for other threads.
395 'call' requests are passed to self.localcall() with the expectation of
396 immediate execution, during which time the socket is not serviced.
398 'queue' requests are used for tasks (which may block or hang) to be
399 processed in a different thread. These requests are fed into
400 request_queue by self.localcall(). Responses to queued requests are
401 taken from response_queue and sent across the link with the associated
402 sequence numbers. Messages in the queues are (sequence_number,
403 request/response) tuples and code using this module removing messages
404 from the request_queue is responsible for returning the correct
405 sequence number in the response_queue.
407 pollresponse() will loop until a response message with the myseq
408 sequence number is received, and will save other responses in
409 self.responses and notify the owning thread.
413 # send queued response if there is one available
415 qmsg
= response_queue
.get(0)
420 message
= (seq
, ('OK', response
))
421 self
.putmessage(message
)
422 # poll for message on link
424 message
= self
.pollmessage(wait
)
425 if message
is None: # socket not ready
430 except AttributeError:
434 self
.debug("pollresponse:%d:myseq:%s" % (seq
, myseq
))
435 # process or queue a request
436 if how
in ("CALL", "QUEUE"):
437 self
.debug("pollresponse:%d:localcall:call:" % seq
)
438 response
= self
.localcall(seq
, resq
)
439 self
.debug("pollresponse:%d:localcall:response:%s"
442 self
.putmessage((seq
, response
))
444 # don't acknowledge the 'queue' request!
447 # return if completed message transaction
450 # must be a response for a different thread:
452 cv
= self
.cvars
.get(seq
, None)
453 # response involving unknown sequence number is discarded,
454 # probably intended for prior incarnation of server
457 self
.responses
[seq
] = resq
462 def handle_EOF(self
):
463 "action taken upon link being closed by peer"
465 self
.debug("handle_EOF")
466 for key
in self
.cvars
:
469 self
.responses
[key
] = ('EOF', None)
472 # call our (possibly overridden) exit function
476 "Classes using rpc client/server can override to augment EOF action"
479 #----------------- end class SocketIO --------------------
481 class RemoteObject(object):
487 objecttable
[oid
] = obj
488 return RemoteProxy(oid
)
490 class RemoteProxy(object):
492 def __init__(self
, oid
):
495 class RPCHandler(SocketServer
.BaseRequestHandler
, SocketIO
):
498 location
= "#S" # Server
500 def __init__(self
, sock
, addr
, svr
):
501 svr
.current_handler
= self
## cgt xxx
502 SocketIO
.__init
__(self
, sock
)
503 SocketServer
.BaseRequestHandler
.__init
__(self
, sock
, addr
, svr
)
506 "handle() method required by SocketServer"
509 def get_remote_proxy(self
, oid
):
510 return RPCProxy(self
, oid
)
512 class RPCClient(SocketIO
):
515 location
= "#C" # Client
517 nextseq
= 1 # Requests coming from the client are odd numbered
519 def __init__(self
, address
, family
=socket
.AF_INET
, type=socket
.SOCK_STREAM
):
520 self
.listening_sock
= socket
.socket(family
, type)
521 self
.listening_sock
.setsockopt(socket
.SOL_SOCKET
,
522 socket
.SO_REUSEADDR
, 1)
523 self
.listening_sock
.bind(address
)
524 self
.listening_sock
.listen(1)
527 working_sock
, address
= self
.listening_sock
.accept()
529 print>>sys
.__stderr
__, "****** Connection request from ", address
530 if address
[0] == LOCALHOST
:
531 SocketIO
.__init
__(self
, working_sock
)
533 print>>sys
.__stderr
__, "** Invalid host: ", address
536 def get_remote_proxy(self
, oid
):
537 return RPCProxy(self
, oid
)
539 class RPCProxy(object):
544 def __init__(self
, sockio
, oid
):
548 def __getattr__(self
, name
):
549 if self
.__methods
is None:
551 if self
.__methods
.get(name
):
552 return MethodProxy(self
.sockio
, self
.oid
, name
)
553 if self
.__attributes
is None:
554 self
.__getattributes
()
555 if self
.__attributes
.has_key(name
):
556 value
= self
.sockio
.remotecall(self
.oid
, '__getattribute__',
560 raise AttributeError, name
562 def __getattributes(self
):
563 self
.__attributes
= self
.sockio
.remotecall(self
.oid
,
564 "__attributes__", (), {})
566 def __getmethods(self
):
567 self
.__methods
= self
.sockio
.remotecall(self
.oid
,
568 "__methods__", (), {})
570 def _getmethods(obj
, methods
):
571 # Helper to get a list of methods from an object
572 # Adds names to dictionary argument 'methods'
573 for name
in dir(obj
):
574 attr
= getattr(obj
, name
)
577 if type(obj
) == types
.InstanceType
:
578 _getmethods(obj
.__class
__, methods
)
579 if type(obj
) == types
.ClassType
:
580 for super in obj
.__bases
__:
581 _getmethods(super, methods
)
583 def _getattributes(obj
, attributes
):
584 for name
in dir(obj
):
585 attr
= getattr(obj
, name
)
586 if not callable(attr
):
589 class MethodProxy(object):
591 def __init__(self
, sockio
, oid
, name
):
596 def __call__(self
, *args
, **kwargs
):
597 value
= self
.sockio
.remotecall(self
.oid
, self
.name
, args
, kwargs
)
601 # XXX KBK 09Sep03 We need a proper unit test for this module. Previously
602 # existing test code was removed at Rev 1.27.