Fixed many errors in the multiplexer main loop.
[iolib.git] / io-multiplex / common.lisp
blob0b0ae95f6cc07733b9c6f4b5a4dd2e82c250970e
1 ;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp -*-
3 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
4 ; Copyright (C) 2006,2007 by Stelian Ionescu ;
5 ; ;
6 ; This program is free software; you can redistribute it and/or modify ;
7 ; it under the terms of the GNU General Public License as published by ;
8 ; the Free Software Foundation; either version 2 of the License, or ;
9 ; (at your option) any later version. ;
10 ; ;
11 ; This program is distributed in the hope that it will be useful, ;
12 ; but WITHOUT ANY WARRANTY; without even the implied warranty of ;
13 ; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ;
14 ; GNU General Public License for more details. ;
15 ; ;
16 ; You should have received a copy of the GNU General Public License ;
17 ; along with this program; if not, write to the ;
18 ; Free Software Foundation, Inc., ;
19 ; 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA ;
20 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
22 (in-package :io.multiplex)
24 (eval-when (:compile-toplevel :load-toplevel :execute)
25 (defvar *available-multiplexers* nil)
26 (defvar *best-available-multiplexer* nil))
28 ;;;
29 ;;; Event-Base
30 ;;;
31 (defclass event-base ()
32 ((mux :initform (make-instance *best-available-multiplexer*)
33 :reader mux-of)
34 (fds :initform (make-hash-table :test 'eql)
35 :reader fds-of)
36 (timeouts :initform (make-queue)
37 :reader timeouts-of)
38 (main-timeout :initform nil
39 :accessor main-timeout-of)
40 (exit :initform nil
41 :accessor exit-p)))
44 (defmethod print-object ((base event-base) stream)
45 (print-unreadable-object (base stream :type nil :identity t)
46 (format stream "event base, ~A FDs monitored, using: ~A"
47 (hash-table-count (fds-of base))
48 (mux-of base))))
51 (defgeneric close-event-base (event-base)
52 (:method ((event-base event-base))
53 (with-accessors ((mux mux-of)) event-base
54 (close-multiplexer mux)
55 (mapc #'(lambda (slot)
56 (slot-makunbound event-base slot))
57 '(fds timeouts exit))
58 event-base)))
61 (defgeneric add-fd (base fd event-type function &key timeout persistent))
64 (defgeneric add-timeout (event-base function timeout &key persistent))
67 (defgeneric remove-event (event-base event))
70 (defgeneric remove-events (event-base event-list))
73 (defgeneric event-dispatch (event-base &key timeout only-once))
76 (defgeneric exit-event-loop (event-base &key delay)
77 (:method ((event-base event-base) &key delay)
78 (setf (main-timeout-of event-base)
79 (add-timeout event-base
80 #'(lambda (fd event-type)
81 (declare (ignore fd event-type))
82 (setf (exit-p event-base) t))
83 delay :persistent nil))))
86 (defgeneric event-base-empty-p (event-base)
87 (:method ((event-base event-base))
88 (and (zerop (hash-table-count (fds-of event-base)))
89 (queue-empty-p (timeouts-of event-base)))))
92 (defgeneric fd-entry-of (event-base fd)
93 (:method ((event-base event-base) fd)
94 (gethash fd (fds-of event-base))))
97 (defun %add-event (event-base event &optional fd-entry)
98 (with-accessors ((fds fds-of) (timeouts timeouts-of)) event-base
99 (when (event-timeout event)
100 ;; add the event to the timeout queue
101 (queue-sorted-insert timeouts event #'< #'event-abs-timeout))
102 (let ((fd (event-fd event)))
103 ;; if it's an FD event add it to its fd-entry int the FDs hash-table
104 ;; if there's no such fd-entry, create it
105 (when fd
106 (fd-entry-add-event fd-entry event)
107 (setf (gethash fd fds) fd-entry))
108 (values event))))
111 (defun %remove-event (event-base event)
112 (with-accessors ((fds fds-of) (timeouts timeouts-of)) event-base
113 (when (event-timeout event)
114 ;; remove the event from the timeout queue
115 (queue-delete timeouts event))
116 (let ((fd (event-fd event)))
117 ;; if it's an FD event remove it from its fd-entry
118 ;; if the fd-emtry is then empty, remove it
119 (when fd
120 (let ((fd-entry (gethash fd fds)))
121 (assert fd-entry)
122 (fd-entry-del-event fd-entry event)
123 (when (fd-entry-empty-p fd-entry)
124 (remhash fd fds))))
125 (values event))))
128 (defun calc-possible-edge-change-when-adding (fd-entry event-type)
129 (cond ((and (eql event-type :read)
130 (queue-empty-p (fd-entry-read-events fd-entry)))
131 :read-add)
132 ((and (eql event-type :write)
133 (queue-empty-p (fd-entry-write-events fd-entry)))
134 :write-add)))
137 (defmethod add-fd ((event-base event-base) fd event-type function &key timeout persistent)
138 (check-type event-type fd-event)
140 (when (> fd (fd-limit-of (mux-of event-base)))
141 (error "Cannot add such a large FD: ~A" fd))
142 (let ((current-entry (fd-entry-of event-base fd))
143 (event (make-event fd event-type function persistent
144 (abs-timeout timeout)
145 (normalize-timeout timeout)))
146 (edge-change nil))
147 (if current-entry
148 (progn
149 (setf edge-change (calc-possible-edge-change-when-adding
150 current-entry event-type))
151 (%add-event event-base event current-entry)
152 (when edge-change
153 (setf (fd-entry-edge-change current-entry) edge-change)
154 (update-fd (mux-of event-base) current-entry)
155 (setf (fd-entry-edge-change current-entry) nil)))
156 (progn
157 (setf current-entry (make-fd-entry fd))
158 (%add-event event-base event current-entry)
159 (unless (monitor-fd (mux-of event-base) current-entry)
160 (%remove-event event-base event))))
161 (values event)))
164 (defmethod add-timeout ((event-base event-base) function timeout &key persistent)
165 (assert timeout)
166 (%add-event event-base (make-event nil :timeout function persistent
167 (abs-timeout timeout)
168 (normalize-timeout timeout))))
171 (defun calc-possible-edge-change-when-removing (fd-entry event-type)
172 (cond ((and (eql event-type :read)
173 (not (queue-empty-p (fd-entry-read-events fd-entry))))
174 :read-del)
175 ((and (eql event-type :write)
176 (not (queue-empty-p (fd-entry-write-events fd-entry))))
177 :write-del)))
180 (defmethod remove-event ((event-base event-base) event)
181 (check-type (event-type event) event-type)
183 (let* ((fd (event-fd event))
184 (current-entry (fd-entry-of event-base fd))
185 (edge-change nil))
186 (if current-entry
187 (progn
188 (setf edge-change (calc-possible-edge-change-when-removing
189 current-entry (event-type event)))
190 (%remove-event event-base event)
191 (if (fd-entry-empty-p current-entry)
192 (unmonitor-fd (mux-of event-base) current-entry)
193 (when edge-change
194 (setf (fd-entry-edge-change current-entry) edge-change)
195 (update-fd (mux-of event-base) current-entry)
196 (setf (fd-entry-edge-change current-entry) nil))))
197 (%remove-event event-base event)))
198 (values event-base))
201 (defmethod remove-events ((event-base event-base) event-list)
202 (dolist (ev event-list)
203 (remove-event event-base ev)))
206 (defmacro with-fd-handler ((event-base fd event-type function
207 &optional timeout)
208 &body body)
209 (let ((event (gensym "EVENT-")))
210 `(let (,event)
211 (unwind-protect
212 (progn
213 (setf ,event (add-fd ,event-base ,fd ,event-type ,function
214 :persistent t
215 :timeout ,timeout))
216 ,@body)
217 (when ,event
218 (remove-event ,event-base ,event))))))
221 (defmethod event-dispatch :around ((event-base event-base) &key timeout only-once)
222 (setf (exit-p event-base) nil)
223 (setf (main-timeout-of event-base) nil)
224 (when timeout
225 (exit-event-loop event-base :delay timeout)
226 (setf only-once t))
227 (unless (event-base-empty-p event-base)
228 (call-next-method event-base :timeout timeout
229 :only-once only-once)))
232 (defmethod event-dispatch ((event-base event-base) &key timeout only-once)
233 (with-accessors ((mux mux-of) (exit-p exit-p)
234 (fds fds-of) (timeouts timeouts-of)) event-base
235 (let* ((min-event-timeout (events-calc-min-rel-timeout timeouts))
236 (actual-timeout (calc-min-timeout min-event-timeout timeout))
237 (before nil)
238 (after nil))
239 (loop
240 :with deletion-list := ()
241 :with dispatch-list := ()
244 (setf before (gettime))
245 (mapc #'(lambda (ev)
246 (event-recalc-abs-timeout ev before))
247 (queue-head timeouts))
248 (dispatch-fd-events-once event-base actual-timeout)
249 (setf after (gettime))
251 (let ((main-timeout (main-timeout-of event-base)))
252 (when main-timeout
253 (remove-event event-base main-timeout)
254 (setf (main-timeout-of event-base) nil)))
256 (multiple-value-setq (deletion-list dispatch-list)
257 (filter-expired-events (expired-events timeouts after)))
258 (dispatch-timeouts dispatch-list)
259 (remove-events event-base deletion-list)
261 (queue-sort timeouts #'< #'event-abs-timeout)
263 :when (or only-once
264 exit-p
265 (event-base-empty-p event-base))
266 :do (loop-finish)))))
269 (defun dispatch-fd-events-once (event-base timeout)
270 (with-accessors ((mux mux-of) (fds fds-of)
271 (timeouts timeouts-of)) event-base
272 (let ((deletion-list ())
273 (fd-events (harvest-events mux timeout)))
274 (dolist (ev fd-events)
275 (destructuring-bind (fd ev-types) ev
276 (let ((fd-entry (fd-entry-of event-base fd)))
277 (if fd-entry
278 (progn
279 (when (member :error ev-types)
280 (dispatch-error-events fd-entry)
281 (setf deletion-list (fd-entry-all-events fd-entry)))
282 (when (member :read ev-types)
283 (dispatch-read-events fd-entry)
284 (setf deletion-list
285 (nconc deletion-list
286 (fd-entry-one-shot-events fd-entry :read))))
287 (when (member :write ev-types)
288 (dispatch-write-events fd-entry)
289 (setf deletion-list
290 (nconc deletion-list
291 (fd-entry-one-shot-events fd-entry :write)))))
292 (warn "Got spurious event for non-monitored FD: ~A" fd)))))
293 (dolist (ev deletion-list)
294 (remove-event event-base ev)))))
297 (defun expired-events (queue now)
298 (queue-filter queue
299 #'(lambda (to)
300 (and to (<= to now)))
301 #'event-abs-timeout))
304 (defun filter-expired-events (events)
305 (let ((deletion-list ())
306 (dispatch-list ()))
307 (dolist (ev events)
308 (push ev dispatch-list)
309 (unless (event-persistent-p ev)
310 (push ev deletion-list)))
311 (values deletion-list dispatch-list)))
314 (defun dispatch-timeouts (dispatch-list)
315 (dolist (ev dispatch-list)
316 (funcall (event-handler ev) nil :timeout)))
319 (defun events-calc-min-rel-timeout (timeouts)
320 (let* ((now (gettime))
321 (first-valid-event (find-if #'(lambda (to)
322 (or (null to) (< now to)))
323 (queue-head timeouts)
324 :key #'event-abs-timeout)))
325 (when (and first-valid-event
326 (event-abs-timeout first-valid-event))
327 (- (event-abs-timeout first-valid-event) now))))
330 (defun dispatch-error-events (fd-entry)
331 (dolist (ev (queue-head (fd-entry-error-events fd-entry)))
332 (funcall (event-handler ev) (fd-entry-fd fd-entry) :error)))
335 (defun dispatch-read-events (fd-entry)
336 (dolist (ev (queue-head (fd-entry-read-events fd-entry)))
337 (funcall (event-handler ev) (fd-entry-fd fd-entry) :read)))
340 (defun dispatch-write-events (fd-entry)
341 (dolist (ev (queue-head (fd-entry-write-events fd-entry)))
342 (funcall (event-handler ev) (fd-entry-fd fd-entry) :write)))
345 ;;; FD-Entry
347 (deftype fd-event ()
348 '(member :read :write :error))
351 (deftype event-type ()
352 '(or fd-event (member :timeout)))
355 (defstruct (fd-entry
356 (:constructor make-fd-entry (fd))
357 (:copier nil))
358 (fd 0 :type fixnum)
359 (edge-change nil :type symbol)
360 (read-events (make-queue) :type queue)
361 (write-events (make-queue) :type queue)
362 (error-events (make-queue) :type queue))
365 (defun fd-entry-event-list (fd-entry event-type)
366 (check-type fd-entry fd-entry)
367 (check-type event-type fd-event)
368 (case event-type
369 (:read (fd-entry-read-events fd-entry))
370 (:write (fd-entry-write-events fd-entry))
371 (:error (fd-entry-error-events fd-entry))))
374 (defun (setf fd-entry-event-list) (fd-entry event-list event-type)
375 (check-type fd-entry fd-entry)
376 (check-type event-type fd-event)
377 (case event-type
378 (:read (setf (fd-entry-read-events fd-entry) event-list))
379 (:write (setf (fd-entry-write-events fd-entry) event-list))
380 (:error (setf (fd-entry-error-events fd-entry) event-list))))
383 (defun fd-entry-empty-p (fd-entry)
384 (and (queue-empty-p (fd-entry-read-events fd-entry))
385 (queue-empty-p (fd-entry-write-events fd-entry))
386 (queue-empty-p (fd-entry-error-events fd-entry))))
389 (defun fd-entry-add-event (fd-entry event)
390 (queue-enqueue (fd-entry-event-list fd-entry (event-type event))
391 event))
394 (defun fd-entry-del-event (fd-entry event)
395 (queue-delete (fd-entry-event-list fd-entry (event-type event))
396 event))
399 (defun fd-entry-all-events (fd-entry)
400 (append (queue-head (fd-entry-read-events fd-entry))
401 (queue-head (fd-entry-write-events fd-entry))
402 (queue-head (fd-entry-error-events fd-entry))))
405 (defun fd-entry-one-shot-events (fd-entry event-type)
406 (remove-if #'event-persistent-p
407 (queue-head (fd-entry-event-list fd-entry event-type))))
410 ;;; Event
412 (defstruct (event
413 (:constructor make-event (fd type handler persistent-p
414 abs-timeout timeout))
415 (:copier nil))
416 ;; a file descriptor or nil in case of a timeout
417 (fd nil :type (or null fixnum))
418 (type nil :type (or null event-type))
419 (handler nil :type (or null function))
420 ;; if an event is not persistent it is removed
421 ;; after it occurs or if it times out
422 (persistent-p nil :type boolean)
423 (abs-timeout nil :type (or null timeout))
424 (timeout nil :type (or null timeout)))
427 (defun event-timed-out-p (event timeout)
428 (let ((ev-to (event-abs-timeout event)))
429 (when (and ev-to timeout)
430 (< timeout ev-to))))
433 (defun event-recalc-abs-timeout (event now)
434 (setf (event-abs-timeout event)
435 (+ now (event-timeout event))))
438 ;;; Multiplexer
441 (defun get-fd-limit ()
442 (let ((fd-limit (et:get-resource-limit et:rlimit-nofile)))
443 (unless (eql fd-limit et:rlim-infinity)
444 (1- fd-limit))))
447 (defclass multiplexer ()
448 ((fd :reader fd-of)
449 (fd-limit :initform (get-fd-limit)
450 :initarg :fd-limit
451 :reader fd-limit-of)))
454 (defgeneric monitor-fd (mux fd-entry)
455 (:method ((mux multiplexer) fd-entry)
456 (declare (ignore fd-entry))
460 (defgeneric update-fd (mux fd-entry)
461 (:method ((mux multiplexer) fd-entry)
462 (declare (ignore fd-entry))
466 (defgeneric unmonitor-fd (mux fd-entry)
467 (:method ((mux multiplexer) fd-entry)
468 (declare (ignore fd-entry))
472 (defgeneric harvest-events (mux timeout))
475 (defgeneric close-multiplexer (mux)
476 (:method-combination progn :most-specific-last)
477 (:method progn ((mux multiplexer))
478 (cancel-finalization mux)
479 (when (slot-boundp mux 'fd)
480 (et:close (fd-of mux))
481 (slot-makunbound mux 'fd))
482 (slot-makunbound mux 'fd-limit)
483 mux))
486 (defmethod monitor-fd :around ((mux multiplexer) fd-entry)
487 (if (ignore-and-print-errors (call-next-method))
489 (warn "FD monitoring failed for FD ~A."
490 (fd-entry-fd fd-entry))))
493 (defmethod update-fd :around ((mux multiplexer) fd-entry)
494 (if (ignore-and-print-errors (call-next-method))
496 (warn "FD status update failed for FD ~A."
497 (fd-entry-fd fd-entry))))
500 (defmethod unmonitor-fd :around ((mux multiplexer) fd-entry)
501 (if (ignore-and-print-errors (call-next-method))
503 (warn "FD unmonitoring failed for FD ~A."
504 (fd-entry-fd fd-entry))))
507 (defmacro define-multiplexer (name priority superclasses slots &rest options)
508 `(progn
509 (defclass ,name ,superclasses ,slots ,@options)
510 (pushnew (cons ,priority ',name)
511 *available-multiplexers*)))
513 ;;;;
514 ;;;; Misc
515 ;;;;
518 ;; (defun wait-until-fd-usable (event-base fd event-type &optional timeout)
519 ;; (let (status)
520 ;; (flet ((callback (fd type)
521 ;; (cond ((eql type :error)
522 ;; (setf status :error))
523 ;; ((eql type event-type)
524 ;; (setf status :ok)))))
525 ;; (with-fd-handler (mux fd event-type #'callback)
526 ;; (loop
527 ;; (serve-fd-events mux :timeout timeout)
528 ;; (when status
529 ;; (return-from wait-until-fd-usable status)))))))