Fix for 0MQ 2.0.7.
[julia.git] / dispatchers.lisp
blob8a2b7d629fd561a10afb497821e4e616d12ebdd1
1 (in-package :julia)
3 (defmacro defdispatch (name &body body)
4 `(defun ,(intern (format nil "DISPATCH-~s" name)) (from to uid args)
5 (declare (ignorable from to uid args))
6 (logger "dispatch ~s: from=~a to=~a uid=~a args='~a'~%" ',name from to uid args)
7 ,@body))
9 (defdispatch who
10 (send-msg from 'know (format nil "~a" *ids*)))
12 (defdispatch hello
13 (pushnew from *ids* :test #'string=)
14 (setq *ids* (sort *ids* #'string<))
15 (coordinate)
16 (logger "new *ids*: ~a~%" *ids*))
18 (defdispatch bye
19 (setq *ids* (sort (delete from *ids* :test #'string=) #'string<))
20 (coordinate)
21 (logger "new *ids*: ~a~%" *ids*))
23 ;; FIXME race condition if new WRITEs/COORDINATORs appear between syncs
24 (defdispatch sync
25 (let ((last-gen (parse-integer args)))
26 (logger "last-gen ~a~%" last-gen)
27 (loop for (id . data) in (reverse *feeds*)
28 when (>= id last-gen) do
29 (send-msg from 'feed (format nil "~a ~a" id data)))))
31 (defdispatch feed
32 (let (id data p)
33 (setq p (search " " args)
34 id (parse-integer (subseq args 0 p))
35 data (subseq args (1+ p)))
36 (assert (= id *gen*))
37 (feed data)))
39 (defdispatch know
40 (setq *ids*
41 (loop with new = (read-from-string args)
42 for i in new
43 when (string/= i *me*) do
44 (pushnew i *ids* :test #'string=)
45 finally (return (sort *ids* #'string<))))
46 (logger "new *ids*: ~a~%" *ids*)
47 (coordinate)
48 (when *not-synced*
49 (setq *not-synced* nil)
50 (send-msg from 'sync *gen*)))
52 (defdispatch write
53 ;; FIXME add support for critical sections
54 (when (and *coordinator* (string= to "COORDINATOR"))
55 (send-msg :all 'write args)
56 (send-msg from 'ack (format nil "~a" uid)))
57 (feed args))
59 #+nil(defdispatch read
60 (let ((key (sxhash (cons *gen* args))))
61 (multiple-value-bind (result presentp)
62 (gethash key *cache*)
63 (send-msg from 'answer
64 (format nil "~d ~d ~a" uid *gen*
65 (if presentp
66 result
67 (let ((ret (rep args)))
68 (logger "eval~%")
69 (setf (gethash key *cache*)ret)
70 ret)))))))
72 (defdispatch read
73 (send-msg from 'answer
74 (format nil "~d ~d ~a" uid *gen*
75 (let ((ret (rep args)))
76 (logger "eval~%")
77 ret))))