Added test.lisp
[netclos.git] / message-handler.lisp
1 ;;;-----------------------------------------------------------------------------------
2 ;;; name : message-handler
3 ;;; description:
4 ;;; notes : why do I use explicit messages instead of just sending list to be evaluated
5 ;;; in the remote objectspace? Well,
6 ;;; - this allows for special handling of requests
7 ;;; - message-specific packing is possible
8 ;;; - its easier to change the low-level communication mechanism, e.g. to pvm
9 ;;; - packing is done in the objspace-process, so the overhead for the calling
10 ;;; process when passing a message to a proxy is not to big.
11 ;;; contact : me (Michael Trowe)
12 ;;; copyright :
13 ;;; history :
14 ;;; contents :
15 ;;;-----------------------------------------------------------------------------------
16 (in-package nc)
18 (shadow 'stream)
20 (defvar *calling-os* ())
22 (defclass objspace (delayed-reply-object)
23 ((host :initarg :host :accessor host)
24 (port :initarg :port :accessor port)
25 (stream :initarg :objstream :accessor stream))) ; FIXME was stream enstead of objstream
27 (defpargeneric send-request :future (objspace address caller method args))
29 (defpargeneric send-operation :past (objspace address caller method args))
31 (defpargeneric kernel-send :future (objspace message))
33 (defpargeneric remote-eval :now (objspace form))
35 (defclass receiver (autonomous-object)
36 ((objspace :accessor objspace :initarg :objspace)
37 (stream :initarg :stream :accessor stream)))
39 (defmethod initialize-instance :after ((inst receiver) &rest initargs)
40 (declare (ignore initargs))
41 nil)
42 #| wird in autonomousobject init-instan. erledigt
43 (setf (process inst) (mp:make-process :name (string (class-name (class-of inst)))
44 :quantum 0.5
45 :priority 1))
46 (mp:process-preset (process inst) #'active-loop inst)
47 (mp:process-enable (process inst)))
51 (defmethod active-loop ((receiver receiver))
52 (handler-case (loop while t
53 do (active-body receiver))
54 (end-of-file () (connection-lost *manager* receiver))))
56 (defmethod active-body ((receiver receiver))
57 (let ((*calling-os* (objspace receiver))
58 (*package* (find-package "NETCLOS")))
59 (acl-compat-mp:wait-for-input-available (stream receiver))
60 ;;;(print (stream receiver) excl::*initial-terminal-io*)
61 ;;The process stops until input through the stream is available.
63 (when (or (equal *local-host* "ki18"))
65 (trace
66 excl::stream-read-char
67 excl::stream-write-char
68 excl::stream-read-byte excl::stream-write-byte read))
70 (let ((read-thing
71 (read (stream receiver))))
72 (when (or (equal *local-host* "ki18")
73 (equal *local-host* "ki3"))
74 (format t "B Rec: ~S B: Tran: ~S" (excl::bytes-received (stream receiver))
75 (excl::bytes-transmitted (stream receiver))))
76 (if (evaluable read-thing)
77 (eval read-thing)
78 (progn
79 (break)
80 )))
82 (eval (read (stream receiver)))
83 )) ;Read a message and evaluate it.
85 (defun evaluable (thing)
86 (cond ((symbolp thing)
87 nil)
88 (t t)))
90 (defmethod schedule ((sched standard-scheduler) (obj objspace) message)
91 (enqueue (mail-queue obj) message)
92 (unless (active-p obj)
93 (setf (active-p obj) t)
94 (acl-compat-mp:process-run-function (list :name (symbol-name (class-name (class-of obj)))
95 :quantum 0.5
96 :priority 1)
97 #'active-loop obj)))
99 (defmethod kill :after ((obj objspace))
100 (close (stream obj)))
102 (defmethod reset ((obj objspace))
103 (clear-output (stream obj)))
105 (defmethod kill :after ((obj receiver))
106 (close (stream obj)))
108 (defmethod reset ((rec receiver))
109 (clear-input (stream rec)))
111 (defmethod send-request ((space objspace) address caller method args)
112 (kernel-send space
113 (request-call-message :obj-address address
114 :caller caller
115 :method method
116 :arguments args)))
118 (defmethod send-operation ((space objspace) address caller method args)
119 (kernel-send space
120 (operation-call-message :obj-address address
121 :caller caller
122 :method method
123 :arguments args)))
125 (defmethod remote-eval ((space objspace) form)
126 (kernel-send space (remote-evaluation-message :form form)))
128 (defmethod kernel-send :before ((space objspace) (message request))
129 (setf (request-message-id message) (store-future space)))
132 (defmethod kernel-send ((space objspace) message)
134 (let ((*package* (find-package "NETCLOS")))
135 #|(when (and (equal (host space) "ki18")
136 (equal (rmi-class-name message) 'nc::function-node))
137 (trace excl::stream-read-byte excl::stream-write-byte )
138 (print (list "TOki18" message))
139 (pack message excl::*initial-terminal-io*))
142 (pack message (stream space))
144 (finish-output (stream space))
146 (values)))
148 (defmethod move ((obj mobile-object)
149 destination-os)
150 (let* ((address (touch (kernel-send destination-os
151 (moving-message :moved-obj obj))))
152 (obj-class-name (class-name (class-of obj)))
153 (proxy (change-class obj (ensure-proxy-class obj-class-name))))
154 ;;; initialisierung analog zu initialize-instance in distribute.lisp
156 (setf (masterclass-name proxy) obj-class-name)
157 (setf (remote-os proxy) destination-os)
158 (setf (remote-address proxy) address)
159 (setf (other-pack-string proxy)
160 (format nil "(np ~a '~a (fos \"~a\"))"
161 (remote-address proxy)
162 (format nil "~S" (masterclass-name proxy))
163 (host (remote-os proxy))))
164 (setf (master-pack-string proxy) (format nil "(fetch-obj ~a)"
165 (remote-address proxy)))
166 (notify-proxy proxy)))
168 (defmethod receive-reply ((space objspace) message-id content)
169 (write-to-future (get-future space message-id) content))
171 (defmethod receive-call ((method symbol) *current-actor* obj args)
172 (apply (fdefinition method) obj args))
174 (defmethod receive-call ((method cons) *current-actor* obj args)
175 (apply (fdefinition method) (first args) obj (rest args)))
179 (defmessage request-call (obj-address caller method arguments)
180 :request-p t
181 :process-p t
182 :receive-action (touch (receive-call method
183 caller
184 (fetch-obj obj-address)
185 arguments)))
187 (defmessage operation-call (obj-address caller method arguments)
188 :process-p t
189 :receive-action (receive-call method
190 caller
191 (fetch-obj obj-address)
192 arguments))
194 (defmessage reply (message-id content)
195 :receive-action (receive-reply *calling-os* message-id content))
197 (defmessage moving ((moved-obj :packing move-pack))
198 :request-p t
199 :receive-action (ensure-exported moved-obj))
201 (defmessage rmi (class-name initargs-b) ;remote-make-instance
202 :request-p t
203 :process-p t
204 :receive-action (apply #'make-instance class-name initargs-b))
206 (defmessage remote-evaluation (form)
207 :request-p t
208 :process-p t
209 :receive-action (eval form))
212 (defmethod pack ((obj t) &optional stream)
213 (print obj stream)
214 (values))
216 (defmethod pack ((obj symbol) &optional stream)
217 (if (keywordp obj)
218 (print obj stream)
219 (print (list 'intern (list 'quote (symbol-name obj)) (intern
220 (package-name (symbol-package obj)) :keyword))
221 stream))
222 (values))
224 (defmethod pack ((obj null) &optional stream)
225 (write-string "()" stream)
226 (values))
229 (defmethod pack ((obj cons) &optional stream)
230 (write-string "(cons " stream)
231 (pack (car obj) stream)
232 (write-char #\space stream)
233 (pack (cdr obj) stream)
234 (write-char #\) stream)
235 (values))
237 (defmethod pack ((obj string) &optional stream)
238 (print obj stream)
239 (values))
241 (defmethod pack ((array array) &optional stream)
242 (write-string "(unpack-array " stream)
243 (pack (array-dimensions array) stream)
244 (write-char #\space stream)
245 (pack (array-element-type array) stream)
246 (write-char #\space stream)
247 (loop for i from 0 upto (1- (array-total-size array))
248 do (progn (pack (row-major-aref array i) stream)
249 (write-char #\space stream)))
250 (write-char #\) stream)
251 (values))
253 (defun unpack-array (dims type &rest elements)
254 (let ((ar (make-array dims :element-type type)))
255 (loop for i from 0 upto (1- (array-total-size ar))
256 for el in elements
257 do (progn (setf (row-major-aref ar i) el)))
258 ar))
260 (defmethod pack ((func function) &optional stream)
261 (let ((denom (get-denom func)))
262 (if denom
263 (progn (write-string "(function " stream)
264 (write-string (symbol-name denom) stream)
265 (write-char #\) stream))
266 (progn (write-string "(fnp " stream)
267 (write (ensure-exported func) :stream stream)
268 (write (class-name (class-of func)) :stream stream)
269 (write-char #\) stream))))
270 (values))
272 (defun fnp (address class-name)
273 (let ((p (notify-proxy (make-proxy class-name
274 address
275 *calling-os*))))
276 #'(lambda (&rest rest)
277 (send-now p 'apply rest))))
279 ;;; ohne *full-packing* wirds dem Benutzer ueberlassen, wie
280 ;;; eine klasse verpackt wird.
282 (defvar *full-packing* nil)
283 (defmethod pack :around ((obj standard-object) &optional stream)
284 (if *full-packing*
285 (pack-standard-object obj stream)
286 (call-next-method)))
288 (defun pack-standard-object (obj stream)
289 (let ((reference (gentemp "I")))
290 (setq *pack-forms* (acons obj reference
291 *pack-forms*))
292 (format stream "(progn (proclaim '(special ~a))
293 (prog1 (setq ~a (make-instance '~S))
294 (unpack-slots ~a (list"
295 reference reference (class-name (class-of obj)) reference)
296 (pack-slots-default (class-of obj) obj stream)
297 (write-string "))))" stream))
298 (values))
301 (defun pack-slots-default (class obj stream)
302 (loop for slot in (class-slots class)
304 (move-pack (slot-value obj (slot-definition-name slot)) stream)))
307 (defmethod pack ((obj standard-object) &optional stream)
308 (multiple-value-bind (address obj-string)
309 (ensure-exported obj)
310 (declare (ignore address))
311 (write-string obj-string stream))
312 (values))
314 (defun np (address class-name &optional (os *calling-os*))
315 (notify-proxy (make-proxy class-name
316 address
317 os)))
320 (defmethod pack ((proxy proxy) &optional stream)
321 (if (eq (remote-os proxy) *current-actor*)
322 (write-string (master-pack-string proxy) stream)
323 (progn (notify-sending proxy)
324 (write-string (other-pack-string proxy) stream)))
325 (values))
327 (defmethod pack ((space objspace) &optional stream)
328 (write-string (concatenate 'string "(fos \"" (host space) "\")") stream)
329 (values))
331 (defun fos (host)
332 (find host (spaces *manager*) :key #'host :test #'equal))
334 (defmethod print-object ((n objspace) stream)
335 (format stream "#<~S on host: ~S>" (type-of n) (slot-value n 'host)))
338 (defvar *pack* ())
340 (defmethod pack ((struc structure-object) &optional stream)
341 (let* ((class-name (class-name (class-of struc)))
342 (class-package (symbol-package class-name)))
343 (format stream "(~A::MAKE-" (package-name class-package))
344 (write-string (string class-name) stream)
345 (write-char #\space stream)
346 (loop for slot in (class-slots (class-of struc))
347 for name = (slot-definition-name slot)
348 do (progn (write-char #\: stream)
349 (write-string (string name) stream)
350 (write-char #\space stream)
351 (pack (funcall (fdefinition (intern (format nil "~a-~a"
352 class-name
353 name)
354 class-package))
355 struc)
356 stream)))
357 (write-char #\) stream)
358 (values)))
360 (defun objectspace-active-p (objspace)
361 (slot-boundp objspace 'stream))