1 ;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp -*-
3 ;; Copyright (C) 2006, 2007 Stelian Ionescu
5 ;; This code is free software; you can redistribute it and/or
6 ;; modify it under the terms of the version 2.1 of
7 ;; the GNU Lesser General Public License as published by
8 ;; the Free Software Foundation, as clarified by the
9 ;; preamble found here:
10 ;; http://opensource.franz.com/preamble.html
12 ;; This program is distributed in the hope that it will be useful,
13 ;; but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 ;; GNU General Public License for more details.
17 ;; You should have received a copy of the GNU Lesser General
18 ;; Public License along with this library; if not, write to the
19 ;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 ;; Boston, MA 02110-1301, USA
22 (in-package :io.multiplex
)
24 (eval-when (:compile-toplevel
:load-toplevel
:execute
)
25 (defvar *available-multiplexers
* nil
)
26 (defvar *best-available-multiplexer
* nil
))
31 (defclass event-base
()
32 ((mux :initform
(make-instance *best-available-multiplexer
*)
34 (fds :initform
(make-hash-table :test
'eql
)
36 (timeouts :initform
(make-queue)
38 (main-timeout :initform nil
39 :accessor main-timeout-of
)
44 (defmethod print-object ((base event-base
) stream
)
45 (print-unreadable-object (base stream
:type nil
:identity t
)
46 (format stream
"event base, ~A FDs monitored, using: ~A"
47 (hash-table-count (fds-of base
))
51 (defgeneric close-event-base
(event-base)
52 (:method
((event-base event-base
))
53 (with-accessors ((mux mux-of
)) event-base
54 (close-multiplexer mux
)
55 (mapc #'(lambda (slot)
56 (setf (slot-value event-base slot
) nil
))
61 (defgeneric add-fd
(base fd event-type function
&key timeout persistent
))
64 (defgeneric add-timeout
(event-base function timeout
&key persistent
))
67 (defgeneric remove-event
(event-base event
))
70 (defgeneric remove-events
(event-base event-list
))
73 (defgeneric event-dispatch
(event-base &key timeout only-once
))
76 (defgeneric exit-event-loop
(event-base &key delay
)
77 (:method
((event-base event-base
) &key delay
)
78 (setf (main-timeout-of event-base
)
79 (add-timeout event-base
80 #'(lambda (fd event-type
)
81 (declare (ignore fd event-type
))
82 (setf (exit-p event-base
) t
))
83 delay
:persistent nil
))))
86 (defgeneric event-base-empty-p
(event-base)
87 (:method
((event-base event-base
))
88 (and (zerop (hash-table-count (fds-of event-base
)))
89 (queue-empty-p (timeouts-of event-base
)))))
92 (defgeneric fd-entry-of
(event-base fd
)
93 (:method
((event-base event-base
) fd
)
94 (gethash fd
(fds-of event-base
))))
97 (defun %add-event
(event-base event
&optional fd-entry
)
98 (with-accessors ((fds fds-of
) (timeouts timeouts-of
)) event-base
99 (when (event-timeout event
)
100 ;; add the event to the timeout queue
101 (queue-sorted-insert timeouts event
#'< #'event-abs-timeout
))
102 (let ((fd (event-fd event
)))
103 ;; if it's an FD event add it to its fd-entry int the FDs hash-table
104 ;; if there's no such fd-entry, create it
106 (fd-entry-add-event fd-entry event
)
107 (setf (gethash fd fds
) fd-entry
))
111 (defun %remove-event
(event-base event
)
112 (with-accessors ((fds fds-of
) (timeouts timeouts-of
)) event-base
113 (when (event-timeout event
)
114 ;; remove the event from the timeout queue
115 (queue-delete timeouts event
))
116 (let ((fd (event-fd event
)))
117 ;; if it's an FD event remove it from its fd-entry
118 ;; if the fd-emtry is then empty, remove it
120 (let ((fd-entry (gethash fd fds
)))
122 (fd-entry-del-event fd-entry event
)
123 (when (fd-entry-empty-p fd-entry
)
128 (defun calc-possible-edge-change-when-adding (fd-entry event-type
)
129 (cond ((and (eql event-type
:read
)
130 (queue-empty-p (fd-entry-read-events fd-entry
)))
132 ((and (eql event-type
:write
)
133 (queue-empty-p (fd-entry-write-events fd-entry
)))
137 (defmethod add-fd ((event-base event-base
) fd event-type function
&key timeout persistent
)
138 (check-type event-type fd-event
)
140 (let ((fd-limit (fd-limit-of (mux-of event-base
))))
141 (when (and fd-limit
(> fd fd-limit
))
142 (error "Cannot add such a large FD: ~A" fd
)))
143 (let ((current-entry (fd-entry-of event-base fd
))
144 (event (make-event fd event-type function persistent
145 (abs-timeout timeout
)
146 (normalize-timeout timeout
)))
150 (setf edge-change
(calc-possible-edge-change-when-adding
151 current-entry event-type
))
152 (%add-event event-base event current-entry
)
154 (setf (fd-entry-edge-change current-entry
) edge-change
)
155 (update-fd (mux-of event-base
) current-entry
)
156 (setf (fd-entry-edge-change current-entry
) nil
)))
158 (setf current-entry
(make-fd-entry fd
))
159 (%add-event event-base event current-entry
)
160 (unless (monitor-fd (mux-of event-base
) current-entry
)
161 (%remove-event event-base event
))))
165 (defmethod add-timeout ((event-base event-base
) function timeout
&key persistent
)
167 (%add-event event-base
(make-event nil
:timeout function persistent
168 (abs-timeout timeout
)
169 (normalize-timeout timeout
))))
172 (defun calc-possible-edge-change-when-removing (fd-entry event-type
)
173 (cond ((and (eql event-type
:read
)
174 (not (queue-empty-p (fd-entry-read-events fd-entry
))))
176 ((and (eql event-type
:write
)
177 (not (queue-empty-p (fd-entry-write-events fd-entry
))))
181 (defmethod remove-event ((event-base event-base
) event
)
182 (check-type (event-type event
) event-type
)
184 (let* ((fd (event-fd event
))
185 (current-entry (fd-entry-of event-base fd
))
189 (setf edge-change
(calc-possible-edge-change-when-removing
190 current-entry
(event-type event
)))
191 (%remove-event event-base event
)
192 (if (fd-entry-empty-p current-entry
)
193 (unmonitor-fd (mux-of event-base
) current-entry
)
195 (setf (fd-entry-edge-change current-entry
) edge-change
)
196 (update-fd (mux-of event-base
) current-entry
)
197 (setf (fd-entry-edge-change current-entry
) nil
))))
198 (%remove-event event-base event
)))
202 (defmethod remove-events ((event-base event-base
) event-list
)
203 (dolist (ev event-list
)
204 (remove-event event-base ev
)))
207 (defmacro with-fd-handler
((event-base fd event-type function
210 (let ((event (gensym "EVENT-")))
214 (setf ,event
(add-fd ,event-base
,fd
,event-type
,function
219 (remove-event ,event-base
,event
))))))
222 (defmethod event-dispatch :around
((event-base event-base
) &key timeout only-once
)
223 (setf (exit-p event-base
) nil
)
224 (setf (main-timeout-of event-base
) nil
)
226 (exit-event-loop event-base
:delay timeout
)
228 (unless (event-base-empty-p event-base
)
229 (call-next-method event-base
:timeout timeout
230 :only-once only-once
)))
233 (defmethod event-dispatch ((event-base event-base
) &key timeout only-once
)
234 (with-accessors ((mux mux-of
) (exit-p exit-p
)
235 (fds fds-of
) (timeouts timeouts-of
)) event-base
236 (let* ((min-event-timeout (events-calc-min-rel-timeout timeouts
))
237 (actual-timeout (calc-min-timeout min-event-timeout timeout
))
241 :with deletion-list
:= ()
242 :with dispatch-list
:= ()
245 (setf before
(et::gettime
))
247 (event-recalc-abs-timeout ev before
))
248 (queue-head timeouts
))
249 (dispatch-fd-events-once event-base actual-timeout
)
250 (setf after
(et::gettime
))
252 (let ((main-timeout (main-timeout-of event-base
)))
254 (remove-event event-base main-timeout
)
255 (setf (main-timeout-of event-base
) nil
)))
257 (setf (values deletion-list dispatch-list
)
258 (filter-expired-events (expired-events timeouts after
)))
259 (dispatch-timeouts dispatch-list
)
260 (remove-events event-base deletion-list
)
262 (queue-sort timeouts
#'< #'event-abs-timeout
)
266 (event-base-empty-p event-base
))
267 :do
(loop-finish)))))
270 (defun dispatch-fd-events-once (event-base timeout
)
271 (with-accessors ((mux mux-of
) (fds fds-of
)
272 (timeouts timeouts-of
)) event-base
273 (let ((deletion-list ())
274 (fd-events (harvest-events mux timeout
)))
275 (dolist (ev fd-events
)
276 (destructuring-bind (fd ev-types
) ev
277 (let ((fd-entry (fd-entry-of event-base fd
)))
280 (when (member :error ev-types
)
281 (dispatch-error-events fd-entry
)
282 (setf deletion-list
(fd-entry-all-events fd-entry
)))
283 (when (member :read ev-types
)
284 (dispatch-read-events fd-entry
)
287 (fd-entry-one-shot-events fd-entry
:read
))))
288 (when (member :write ev-types
)
289 (dispatch-write-events fd-entry
)
292 (fd-entry-one-shot-events fd-entry
:write
)))))
293 (warn "Got spurious event for non-monitored FD: ~A" fd
)))))
294 (dolist (ev deletion-list
)
295 (remove-event event-base ev
)))))
298 (defun expired-events (queue now
)
301 (and to
(<= to now
)))
302 #'event-abs-timeout
))
305 (defun filter-expired-events (events)
306 (let ((deletion-list ())
309 (push ev dispatch-list
)
310 (unless (event-persistent-p ev
)
311 (push ev deletion-list
)))
312 (values deletion-list dispatch-list
)))
315 (defun dispatch-timeouts (dispatch-list)
316 (dolist (ev dispatch-list
)
317 (funcall (event-handler ev
) nil
:timeout
)))
320 (defun events-calc-min-rel-timeout (timeouts)
321 (let* ((now (et::gettime
))
322 (first-valid-event (find-if #'(lambda (to)
323 (or (null to
) (< now to
)))
324 (queue-head timeouts
)
325 :key
#'event-abs-timeout
)))
326 (when (and first-valid-event
327 (event-abs-timeout first-valid-event
))
328 (- (event-abs-timeout first-valid-event
) now
))))
331 (defun dispatch-error-events (fd-entry)
332 (dolist (ev (queue-head (fd-entry-error-events fd-entry
)))
333 (funcall (event-handler ev
) (fd-entry-fd fd-entry
) :error
)))
336 (defun dispatch-read-events (fd-entry)
337 (dolist (ev (queue-head (fd-entry-read-events fd-entry
)))
338 (funcall (event-handler ev
) (fd-entry-fd fd-entry
) :read
)))
341 (defun dispatch-write-events (fd-entry)
342 (dolist (ev (queue-head (fd-entry-write-events fd-entry
)))
343 (funcall (event-handler ev
) (fd-entry-fd fd-entry
) :write
)))
349 '(member :read
:write
:error
))
352 (deftype event-type
()
353 '(or fd-event
(member :timeout
)))
357 (:constructor make-fd-entry
(fd))
359 (fd 0 :type unsigned-byte
)
360 (edge-change nil
:type symbol
)
361 (read-events (make-queue) :type queue
)
362 (write-events (make-queue) :type queue
)
363 (error-events (make-queue) :type queue
))
366 (defun fd-entry-event-list (fd-entry event-type
)
367 (check-type fd-entry fd-entry
)
368 (check-type event-type fd-event
)
370 (:read
(fd-entry-read-events fd-entry
))
371 (:write
(fd-entry-write-events fd-entry
))
372 (:error
(fd-entry-error-events fd-entry
))))
375 (defun (setf fd-entry-event-list
) (fd-entry event-list event-type
)
376 (check-type fd-entry fd-entry
)
377 (check-type event-type fd-event
)
379 (:read
(setf (fd-entry-read-events fd-entry
) event-list
))
380 (:write
(setf (fd-entry-write-events fd-entry
) event-list
))
381 (:error
(setf (fd-entry-error-events fd-entry
) event-list
))))
384 (defun fd-entry-empty-p (fd-entry)
385 (and (queue-empty-p (fd-entry-read-events fd-entry
))
386 (queue-empty-p (fd-entry-write-events fd-entry
))
387 (queue-empty-p (fd-entry-error-events fd-entry
))))
390 (defun fd-entry-add-event (fd-entry event
)
391 (queue-enqueue (fd-entry-event-list fd-entry
(event-type event
))
395 (defun fd-entry-del-event (fd-entry event
)
396 (queue-delete (fd-entry-event-list fd-entry
(event-type event
))
400 (defun fd-entry-all-events (fd-entry)
401 (append (queue-head (fd-entry-read-events fd-entry
))
402 (queue-head (fd-entry-write-events fd-entry
))
403 (queue-head (fd-entry-error-events fd-entry
))))
406 (defun fd-entry-one-shot-events (fd-entry event-type
)
407 (remove-if #'event-persistent-p
408 (queue-head (fd-entry-event-list fd-entry event-type
))))
414 (:constructor make-event
(fd type handler persistent-p
415 abs-timeout timeout
))
417 ;; a file descriptor or nil in case of a timeout
418 (fd nil
:type
(or null unsigned-byte
))
419 (type nil
:type
(or null event-type
))
420 (handler nil
:type
(or null function
))
421 ;; if an event is not persistent it is removed
422 ;; after it occurs or if it times out
423 (persistent-p nil
:type boolean
)
424 (abs-timeout nil
:type
(or null timeout
))
425 (timeout nil
:type
(or null timeout
)))
428 (defun event-timed-out-p (event timeout
)
429 (let ((ev-to (event-abs-timeout event
)))
430 (when (and ev-to timeout
)
434 (defun event-recalc-abs-timeout (event now
)
435 (setf (event-abs-timeout event
)
436 (+ now
(event-timeout event
))))
442 (defun get-fd-limit ()
443 (let ((fd-limit (et:get-resource-limit et
:rlimit-nofile
)))
444 (unless (eql fd-limit et
:rlim-infinity
)
448 (defclass multiplexer
()
450 (fd-limit :initform
(get-fd-limit)
452 :reader fd-limit-of
)))
455 (defgeneric monitor-fd
(mux fd-entry
)
456 (:method
((mux multiplexer
) fd-entry
)
457 (declare (ignore fd-entry
))
461 (defgeneric update-fd
(mux fd-entry
)
462 (:method
((mux multiplexer
) fd-entry
)
463 (declare (ignore fd-entry
))
467 (defgeneric unmonitor-fd
(mux fd-entry
)
468 (:method
((mux multiplexer
) fd-entry
)
469 (declare (ignore fd-entry
))
473 (defgeneric harvest-events
(mux timeout
))
476 (defgeneric close-multiplexer
(mux)
477 (:method-combination progn
:most-specific-last
)
478 (:method progn
((mux multiplexer
))
479 (when (slot-value mux
'fd
)
480 (et:close
(fd-of mux
))
481 (setf (slot-value mux
'fd
) nil
))
485 (defmethod monitor-fd :around
((mux multiplexer
) fd-entry
)
486 (if (ignore-and-print-errors (call-next-method))
488 (warn "FD monitoring failed for FD ~A."
489 (fd-entry-fd fd-entry
))))
492 (defmethod update-fd :around
((mux multiplexer
) fd-entry
)
493 (if (ignore-and-print-errors (call-next-method))
495 (warn "FD status update failed for FD ~A."
496 (fd-entry-fd fd-entry
))))
499 (defmethod unmonitor-fd :around
((mux multiplexer
) fd-entry
)
500 (if (ignore-and-print-errors (call-next-method))
502 (warn "FD unmonitoring failed for FD ~A."
503 (fd-entry-fd fd-entry
))))
506 (defmacro define-multiplexer
(name priority superclasses slots
&rest options
)
508 (defclass ,name
,superclasses
,slots
,@options
)
509 (pushnew (cons ,priority
',name
)
510 *available-multiplexers
*)))
516 ;; FIXME: Until a way to autodetect platform features is implemented
517 (define-constant et
::pollrdhup
0)
519 (defun wait-until-fd-ready (fd event-type
&optional timeout
)
520 ;; FIXME: this conses badly for its return value
521 ;; solution: (1) use a fixnum bitmap, just like C
522 ;; (2) if we really want to expose only lists of keyword as the API,
523 ;; cache a bitmap-indexed vector of all the combinations (sharing tails)
524 (flet ((choose-poll-flags (type)
526 (:read
(logior et
:pollin et
::pollrdhup et
:pollpri
))
527 (:write
(logior et
:pollout et
:pollhup
))
528 (:read-write
(logior et
:pollin et
::pollrdhup et
:pollpri
529 et
:pollout et
:pollhup
)))))
531 (with-foreign-object (pollfd 'et
:pollfd
)
532 (et:bzero pollfd et
:size-of-pollfd
)
533 (with-foreign-slots ((et:fd et
:events et
:revents
) pollfd et
:pollfd
)
535 et
:events
(choose-poll-flags event-type
))
537 (let ((ret (et:repeat-upon-eintr
538 (et:poll pollfd
1 (timeout->milisec timeout
)))))
540 (return-from wait-until-fd-ready
'(:timeout
))))
542 (return-from wait-until-fd-ready
'(:error
))))
543 (flags-case et
:revents
544 ((et:pollout et
:pollhup
) (push :write status
))
545 ((et:pollin et
::pollrdhup et
:pollpri
) (push :read status
))
546 ((et:pollerr et
:pollnval
) (push :error status
)))
547 (return-from wait-until-fd-ready status
))))))
549 (defun fd-ready-p (fd &optional
(event-type :read
))
550 (not (member :timeout
(wait-until-fd-ready fd event-type
0) :test
#'eq
)))