1 ;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp -*-
3 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
4 ; Copyright (C) 2006,2007 by Stelian Ionescu ;
6 ; This program is free software; you can redistribute it and/or modify ;
7 ; it under the terms of the GNU General Public License as published by ;
8 ; the Free Software Foundation; either version 2 of the License, or ;
9 ; (at your option) any later version. ;
11 ; This program is distributed in the hope that it will be useful, ;
12 ; but WITHOUT ANY WARRANTY; without even the implied warranty of ;
13 ; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ;
14 ; GNU General Public License for more details. ;
16 ; You should have received a copy of the GNU General Public License ;
17 ; along with this program; if not, write to the ;
18 ; Free Software Foundation, Inc., ;
19 ; 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA ;
20 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
22 (in-package :io.multiplex
)
24 (eval-when (:compile-toplevel
:load-toplevel
:execute
)
25 (defvar *available-multiplexers
* nil
)
26 (defvar *best-available-multiplexer
* nil
))
31 (defclass event-base
()
32 ((mux :initform
(make-instance *best-available-multiplexer
*)
34 (fds :initform
(make-hash-table :test
'eql
)
36 (timeouts :initform
(make-queue)
38 (main-timeout :initform nil
39 :accessor main-timeout-of
)
44 (defmethod print-object ((base event-base
) stream
)
45 (print-unreadable-object (base stream
:type nil
:identity t
)
46 (format stream
"event base, ~A FDs monitored, using: ~A"
47 (hash-table-count (fds-of base
))
51 (defgeneric close-event-base
(event-base)
52 (:method
((event-base event-base
))
53 (with-accessors ((mux mux-of
)) event-base
54 (close-multiplexer mux
)
55 (mapc #'(lambda (slot)
56 (slot-makunbound event-base slot
))
61 (defgeneric add-fd
(base fd event-type function
&key timeout persistent
))
64 (defgeneric add-timeout
(event-base function timeout
&key persistent
))
67 (defgeneric remove-event
(event-base event
))
70 (defgeneric remove-events
(event-base event-list
))
73 (defgeneric event-dispatch
(event-base &key timeout only-once
))
76 (defgeneric exit-event-loop
(event-base &key delay
)
77 (:method
((event-base event-base
) &key delay
)
78 (setf (main-timeout-of event-base
)
79 (add-timeout event-base
80 #'(lambda (fd event-type
)
81 (declare (ignore fd event-type
))
82 (setf (exit-p event-base
) t
))
83 delay
:persistent nil
))))
86 (defgeneric event-base-empty-p
(event-base)
87 (:method
((event-base event-base
))
88 (and (zerop (hash-table-count (fds-of event-base
)))
89 (queue-empty-p (timeouts-of event-base
)))))
92 (defgeneric fd-entry-of
(event-base fd
)
93 (:method
((event-base event-base
) fd
)
94 (gethash fd
(fds-of event-base
))))
97 (defun %add-event
(event-base event
&optional fd-entry
)
98 (with-accessors ((fds fds-of
) (timeouts timeouts-of
)) event-base
99 (when (event-timeout event
)
100 ;; add the event to the timeout queue
101 (queue-sorted-insert timeouts event
#'< #'event-abs-timeout
))
102 (let ((fd (event-fd event
)))
103 ;; if it's an FD event add it to its fd-entry int the FDs hash-table
104 ;; if there's no such fd-entry, create it
106 (fd-entry-add-event fd-entry event
)
107 (setf (gethash fd fds
) fd-entry
))
111 (defun %remove-event
(event-base event
)
112 (with-accessors ((fds fds-of
) (timeouts timeouts-of
)) event-base
113 (when (event-timeout event
)
114 ;; remove the event from the timeout queue
115 (queue-delete timeouts event
))
116 (let ((fd (event-fd event
)))
117 ;; if it's an FD event remove it from its fd-entry
118 ;; if the fd-emtry is then empty, remove it
120 (let ((fd-entry (gethash fd fds
)))
122 (fd-entry-del-event fd-entry event
)
123 (when (fd-entry-empty-p fd-entry
)
128 (defun calc-possible-edge-change-when-adding (fd-entry event-type
)
129 (cond ((and (eql event-type
:read
)
130 (queue-empty-p (fd-entry-read-events fd-entry
)))
132 ((and (eql event-type
:write
)
133 (queue-empty-p (fd-entry-write-events fd-entry
)))
137 (defmethod add-fd ((event-base event-base
) fd event-type function
&key timeout persistent
)
138 (check-type event-type fd-event
)
140 (when (> fd
(fd-limit-of (mux-of event-base
)))
141 (error "Cannot add such a large FD: ~A" fd
))
142 (let ((current-entry (fd-entry-of event-base fd
))
143 (event (make-event fd event-type function persistent
144 (abs-timeout timeout
)
145 (normalize-timeout timeout
)))
149 (setf edge-change
(calc-possible-edge-change-when-adding
150 current-entry event-type
))
151 (%add-event event-base event current-entry
)
153 (setf (fd-entry-edge-change current-entry
) edge-change
)
154 (update-fd (mux-of event-base
) current-entry
)
155 (setf (fd-entry-edge-change current-entry
) nil
)))
157 (setf current-entry
(make-fd-entry fd
))
158 (%add-event event-base event current-entry
)
159 (unless (monitor-fd (mux-of event-base
) current-entry
)
160 (%remove-event event-base event
))))
164 (defmethod add-timeout ((event-base event-base
) function timeout
&key persistent
)
166 (%add-event event-base
(make-event nil
:timeout function persistent
167 (abs-timeout timeout
)
168 (normalize-timeout timeout
))))
171 (defun calc-possible-edge-change-when-removing (fd-entry event-type
)
172 (cond ((and (eql event-type
:read
)
173 (not (queue-empty-p (fd-entry-read-events fd-entry
))))
175 ((and (eql event-type
:write
)
176 (not (queue-empty-p (fd-entry-write-events fd-entry
))))
180 (defmethod remove-event ((event-base event-base
) event
)
181 (check-type (event-type event
) event-type
)
183 (let* ((fd (event-fd event
))
184 (current-entry (fd-entry-of event-base fd
))
188 (setf edge-change
(calc-possible-edge-change-when-removing
189 current-entry
(event-type event
)))
190 (%remove-event event-base event
)
191 (if (fd-entry-empty-p current-entry
)
192 (unmonitor-fd (mux-of event-base
) current-entry
)
194 (setf (fd-entry-edge-change current-entry
) edge-change
)
195 (update-fd (mux-of event-base
) current-entry
)
196 (setf (fd-entry-edge-change current-entry
) nil
))))
197 (%remove-event event-base event
)))
201 (defmethod remove-events ((event-base event-base
) event-list
)
202 (dolist (ev event-list
)
203 (remove-event event-base ev
)))
206 (defmacro with-fd-handler
((event-base fd event-type function
209 (let ((event (gensym "EVENT-")))
213 (setf ,event
(add-fd ,event-base
,fd
,event-type
,function
218 (remove-event ,event-base
,event
))))))
221 (defmethod event-dispatch :around
((event-base event-base
) &key timeout only-once
)
222 (setf (exit-p event-base
) nil
)
223 (setf (main-timeout-of event-base
) nil
)
225 (exit-event-loop event-base
:delay timeout
)
227 (unless (event-base-empty-p event-base
)
228 (call-next-method event-base
:timeout timeout
229 :only-once only-once
)))
232 (defmethod event-dispatch ((event-base event-base
) &key timeout only-once
)
233 (with-accessors ((mux mux-of
) (exit-p exit-p
)
234 (fds fds-of
) (timeouts timeouts-of
)) event-base
235 (let* ((min-event-timeout (events-calc-min-rel-timeout timeouts
))
236 (actual-timeout (calc-min-timeout min-event-timeout timeout
))
240 :with deletion-list
:= ()
241 :with dispatch-list
:= ()
244 (setf before
(gettime))
246 (event-recalc-abs-timeout ev before
))
247 (queue-head timeouts
))
248 (dispatch-fd-events-once event-base actual-timeout
)
249 (setf after
(gettime))
251 (let ((main-timeout (main-timeout-of event-base
)))
253 (remove-event event-base main-timeout
)
254 (setf (main-timeout-of event-base
) nil
)))
256 (multiple-value-setq (deletion-list dispatch-list
)
257 (filter-expired-events (expired-events timeouts after
)))
258 (dispatch-timeouts dispatch-list
)
259 (remove-events event-base deletion-list
)
261 (queue-sort timeouts
#'< #'event-abs-timeout
)
265 (event-base-empty-p event-base
))
266 :do
(loop-finish)))))
269 (defun dispatch-fd-events-once (event-base timeout
)
270 (with-accessors ((mux mux-of
) (fds fds-of
)
271 (timeouts timeouts-of
)) event-base
272 (let ((deletion-list ())
273 (fd-events (harvest-events mux timeout
)))
274 (dolist (ev fd-events
)
275 (destructuring-bind (fd ev-types
) ev
276 (let ((fd-entry (fd-entry-of event-base fd
)))
279 (when (member :error ev-types
)
280 (dispatch-error-events fd-entry
)
281 (setf deletion-list
(fd-entry-all-events fd-entry
)))
282 (when (member :read ev-types
)
283 (dispatch-read-events fd-entry
)
286 (fd-entry-one-shot-events fd-entry
:read
))))
287 (when (member :write ev-types
)
288 (dispatch-write-events fd-entry
)
291 (fd-entry-one-shot-events fd-entry
:write
)))))
292 (warn "Got spurious event for non-monitored FD: ~A" fd
)))))
293 (dolist (ev deletion-list
)
294 (remove-event event-base ev
)))))
297 (defun expired-events (queue now
)
300 (and to
(<= to now
)))
301 #'event-abs-timeout
))
304 (defun filter-expired-events (events)
305 (let ((deletion-list ())
308 (push ev dispatch-list
)
309 (unless (event-persistent-p ev
)
310 (push ev deletion-list
)))
311 (values deletion-list dispatch-list
)))
314 (defun dispatch-timeouts (dispatch-list)
315 (dolist (ev dispatch-list
)
316 (funcall (event-handler ev
) nil
:timeout
)))
319 (defun events-calc-min-rel-timeout (timeouts)
320 (let* ((now (gettime))
321 (first-valid-event (find-if #'(lambda (to)
322 (or (null to
) (< now to
)))
323 (queue-head timeouts
)
324 :key
#'event-abs-timeout
)))
325 (when (and first-valid-event
326 (event-abs-timeout first-valid-event
))
327 (- (event-abs-timeout first-valid-event
) now
))))
330 (defun dispatch-error-events (fd-entry)
331 (dolist (ev (queue-head (fd-entry-error-events fd-entry
)))
332 (funcall (event-handler ev
) (fd-entry-fd fd-entry
) :error
)))
335 (defun dispatch-read-events (fd-entry)
336 (dolist (ev (queue-head (fd-entry-read-events fd-entry
)))
337 (funcall (event-handler ev
) (fd-entry-fd fd-entry
) :read
)))
340 (defun dispatch-write-events (fd-entry)
341 (dolist (ev (queue-head (fd-entry-write-events fd-entry
)))
342 (funcall (event-handler ev
) (fd-entry-fd fd-entry
) :write
)))
348 '(member :read
:write
:error
))
351 (deftype event-type
()
352 '(or fd-event
(member :timeout
)))
356 (:constructor make-fd-entry
(fd))
359 (edge-change nil
:type symbol
)
360 (read-events (make-queue) :type queue
)
361 (write-events (make-queue) :type queue
)
362 (error-events (make-queue) :type queue
))
365 (defun fd-entry-event-list (fd-entry event-type
)
366 (check-type fd-entry fd-entry
)
367 (check-type event-type fd-event
)
369 (:read
(fd-entry-read-events fd-entry
))
370 (:write
(fd-entry-write-events fd-entry
))
371 (:error
(fd-entry-error-events fd-entry
))))
374 (defun (setf fd-entry-event-list
) (fd-entry event-list event-type
)
375 (check-type fd-entry fd-entry
)
376 (check-type event-type fd-event
)
378 (:read
(setf (fd-entry-read-events fd-entry
) event-list
))
379 (:write
(setf (fd-entry-write-events fd-entry
) event-list
))
380 (:error
(setf (fd-entry-error-events fd-entry
) event-list
))))
383 (defun fd-entry-empty-p (fd-entry)
384 (and (queue-empty-p (fd-entry-read-events fd-entry
))
385 (queue-empty-p (fd-entry-write-events fd-entry
))
386 (queue-empty-p (fd-entry-error-events fd-entry
))))
389 (defun fd-entry-add-event (fd-entry event
)
390 (queue-enqueue (fd-entry-event-list fd-entry
(event-type event
))
394 (defun fd-entry-del-event (fd-entry event
)
395 (queue-delete (fd-entry-event-list fd-entry
(event-type event
))
399 (defun fd-entry-all-events (fd-entry)
400 (append (queue-head (fd-entry-read-events fd-entry
))
401 (queue-head (fd-entry-write-events fd-entry
))
402 (queue-head (fd-entry-error-events fd-entry
))))
405 (defun fd-entry-one-shot-events (fd-entry event-type
)
406 (remove-if #'event-persistent-p
407 (queue-head (fd-entry-event-list fd-entry event-type
))))
413 (:constructor make-event
(fd type handler persistent-p
414 abs-timeout timeout
))
416 ;; a file descriptor or nil in case of a timeout
417 (fd nil
:type
(or null fixnum
))
418 (type nil
:type
(or null event-type
))
419 (handler nil
:type
(or null function
))
420 ;; if an event is not persistent it is removed
421 ;; after it occurs or if it times out
422 (persistent-p nil
:type boolean
)
423 (abs-timeout nil
:type
(or null timeout
))
424 (timeout nil
:type
(or null timeout
)))
427 (defun event-timed-out-p (event timeout
)
428 (let ((ev-to (event-abs-timeout event
)))
429 (when (and ev-to timeout
)
433 (defun event-recalc-abs-timeout (event now
)
434 (setf (event-abs-timeout event
)
435 (+ now
(event-timeout event
))))
441 (defun get-fd-limit ()
442 (let ((fd-limit (et:get-resource-limit et
:rlimit-nofile
)))
443 (unless (eql fd-limit et
:rlim-infinity
)
447 (defclass multiplexer
()
449 (fd-limit :initform
(get-fd-limit)
451 :reader fd-limit-of
)))
454 (defgeneric monitor-fd
(mux fd-entry
)
455 (:method
((mux multiplexer
) fd-entry
)
456 (declare (ignore fd-entry
))
460 (defgeneric update-fd
(mux fd-entry
)
461 (:method
((mux multiplexer
) fd-entry
)
462 (declare (ignore fd-entry
))
466 (defgeneric unmonitor-fd
(mux fd-entry
)
467 (:method
((mux multiplexer
) fd-entry
)
468 (declare (ignore fd-entry
))
472 (defgeneric harvest-events
(mux timeout
))
475 (defgeneric close-multiplexer
(mux)
476 (:method-combination progn
:most-specific-last
)
477 (:method progn
((mux multiplexer
))
478 (cancel-finalization mux
)
479 (when (slot-boundp mux
'fd
)
480 (et:close
(fd-of mux
))
481 (slot-makunbound mux
'fd
))
482 (slot-makunbound mux
'fd-limit
)
486 (defmethod monitor-fd :around
((mux multiplexer
) fd-entry
)
487 (if (ignore-and-print-errors (call-next-method))
489 (warn "FD monitoring failed for FD ~A."
490 (fd-entry-fd fd-entry
))))
493 (defmethod update-fd :around
((mux multiplexer
) fd-entry
)
494 (if (ignore-and-print-errors (call-next-method))
496 (warn "FD status update failed for FD ~A."
497 (fd-entry-fd fd-entry
))))
500 (defmethod unmonitor-fd :around
((mux multiplexer
) fd-entry
)
501 (if (ignore-and-print-errors (call-next-method))
503 (warn "FD unmonitoring failed for FD ~A."
504 (fd-entry-fd fd-entry
))))
507 (defmacro define-multiplexer
(name priority superclasses slots
&rest options
)
509 (defclass ,name
,superclasses
,slots
,@options
)
510 (pushnew (cons ,priority
',name
)
511 *available-multiplexers
*)))
518 ;; (defun wait-until-fd-usable (event-base fd event-type &optional timeout)
520 ;; (flet ((callback (fd type)
521 ;; (cond ((eql type :error)
522 ;; (setf status :error))
523 ;; ((eql type event-type)
524 ;; (setf status :ok)))))
525 ;; (with-fd-handler (mux fd event-type #'callback)
527 ;; (serve-fd-events mux :timeout timeout)
529 ;; (return-from wait-until-fd-usable status)))))))