Fix in WAIT-UNTIL-FD-READY: on EINTR the timeout is recalculated before calling again...
[iolib.git] / io-multiplex / common.lisp
blobd81fd2929414e69e386d13543915aa424dd01f44
1 ;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp -*-
3 ;; Copyright (C) 2006, 2007 Stelian Ionescu
4 ;;
5 ;; This code is free software; you can redistribute it and/or
6 ;; modify it under the terms of the version 2.1 of
7 ;; the GNU Lesser General Public License as published by
8 ;; the Free Software Foundation, as clarified by the
9 ;; preamble found here:
10 ;; http://opensource.franz.com/preamble.html
12 ;; This program is distributed in the hope that it will be useful,
13 ;; but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 ;; GNU General Public License for more details.
17 ;; You should have received a copy of the GNU Lesser General
18 ;; Public License along with this library; if not, write to the
19 ;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 ;; Boston, MA 02110-1301, USA
22 (in-package :io.multiplex)
24 (eval-when (:compile-toplevel :load-toplevel :execute)
25 (defvar *available-multiplexers* nil)
26 (defvar *best-available-multiplexer* nil))
28 (defvar *maximum-event-loop-timeout* 1)
31 ;;;
32 ;;; Event-Base
33 ;;;
34 (defclass event-base ()
35 ((mux :initform (make-instance *best-available-multiplexer*)
36 :reader mux-of)
37 (fds :initform (make-hash-table :test 'eql)
38 :reader fds-of)
39 (timeouts :initform (make-queue)
40 :reader timeouts-of)
41 (exit :initform nil
42 :accessor exit-p)
43 (exit-when-empty :initarg :exit-when-empty
44 :accessor exit-when-empty-p))
45 (:default-initargs :exit-when-empty nil))
47 (defmethod print-object ((base event-base) stream)
48 (print-unreadable-object (base stream :type nil :identity t)
49 (format stream "event base, ~A FDs monitored, using: ~A"
50 (hash-table-count (fds-of base))
51 (mux-of base))))
53 (defmethod close ((event-base event-base) &key abort)
54 (declare (ignore abort))
55 (with-accessors ((mux mux-of)) event-base
56 (close mux)
57 (dolist (slot '(fds timeouts exit))
58 (setf (slot-value event-base slot) nil))
59 event-base))
61 (defgeneric add-fd (base fd event-type function &key timeout persistent))
63 (defgeneric add-timeout (event-base function timeout &key persistent))
65 (defgeneric remove-event (event-base event))
67 (defgeneric remove-events (event-base event-list))
69 (defgeneric event-dispatch (event-base &key &allow-other-keys))
71 (defgeneric exit-event-loop (event-base &key delay)
72 (:method ((event-base event-base) &key (delay 0))
73 (add-timeout event-base
74 #'(lambda (fd event-type)
75 (declare (ignore fd event-type))
76 (setf (exit-p event-base) t))
77 delay :persistent nil)))
79 (defgeneric event-base-empty-p (event-base)
80 (:method ((event-base event-base))
81 (and (zerop (hash-table-count (fds-of event-base)))
82 (queue-empty-p (timeouts-of event-base)))))
84 (defgeneric fd-entry-of (event-base fd)
85 (:method ((event-base event-base) fd)
86 (gethash fd (fds-of event-base))))
89 (defun %add-event (event-base event &optional fd-entry)
90 (with-accessors ((fds fds-of) (timeouts timeouts-of)) event-base
91 (when (event-timeout event)
92 ;; add the event to the timeout queue
93 (queue-sorted-insert timeouts event #'< #'event-abs-timeout))
94 (let ((fd (event-fd event)))
95 ;; if it's an FD event add it to its fd-entry int the FDs hash-table
96 ;; if there's no such fd-entry, create it
97 (when fd
98 (fd-entry-add-event fd-entry event)
99 (setf (gethash fd fds) fd-entry))
100 (values event))))
102 (defun %remove-event (event-base event)
103 (with-accessors ((fds fds-of) (timeouts timeouts-of)) event-base
104 (when (event-timeout event)
105 ;; remove the event from the timeout queue
106 (queue-delete timeouts event))
107 (let ((fd (event-fd event)))
108 ;; if it's an FD event remove it from its fd-entry
109 ;; if the fd-emtry is then empty, remove it
110 (when fd
111 (let ((fd-entry (gethash fd fds)))
112 (assert fd-entry)
113 (fd-entry-del-event fd-entry event)
114 (when (fd-entry-empty-p fd-entry)
115 (remhash fd fds))))
116 (values event))))
118 (defun calc-possible-edge-change-when-adding (fd-entry event-type)
119 (cond ((and (eql event-type :read)
120 (queue-empty-p (fd-entry-read-events fd-entry)))
121 :read-add)
122 ((and (eql event-type :write)
123 (queue-empty-p (fd-entry-write-events fd-entry)))
124 :write-add)))
126 (defmethod add-fd ((event-base event-base) fd event-type function &key timeout persistent)
127 (check-type fd unsigned-byte)
128 (check-type event-type fd-event)
129 (let ((fd-limit (fd-limit-of (mux-of event-base))))
130 (when (and fd-limit (> fd fd-limit))
131 (error "Cannot add such a large FD: ~A" fd)))
132 (let ((current-entry (fd-entry-of event-base fd))
133 (event (make-event fd event-type function persistent
134 (abs-timeout timeout)
135 (normalize-timeout timeout)))
136 (edge-change nil))
137 (if current-entry
138 (progn
139 (setf edge-change (calc-possible-edge-change-when-adding
140 current-entry event-type))
141 (%add-event event-base event current-entry)
142 (when edge-change
143 (setf (fd-entry-edge-change current-entry) edge-change)
144 (update-fd (mux-of event-base) current-entry)
145 (setf (fd-entry-edge-change current-entry) nil)))
146 (progn
147 (setf current-entry (make-fd-entry fd))
148 (%add-event event-base event current-entry)
149 (unless (monitor-fd (mux-of event-base) current-entry)
150 (%remove-event event-base event))))
151 (values event)))
153 (defmethod add-timeout ((event-base event-base) function timeout &key persistent)
154 (assert timeout)
155 (%add-event event-base (make-event nil :timeout function persistent
156 (abs-timeout timeout)
157 (normalize-timeout timeout))))
159 (defun calc-possible-edge-change-when-removing (fd-entry event-type)
160 (cond ((and (eql event-type :read)
161 (not (queue-empty-p (fd-entry-read-events fd-entry))))
162 :read-del)
163 ((and (eql event-type :write)
164 (not (queue-empty-p (fd-entry-write-events fd-entry))))
165 :write-del)))
167 (defmethod remove-event ((event-base event-base) event)
168 (check-type (event-type event) event-type)
169 (let* ((fd (event-fd event))
170 (current-entry (fd-entry-of event-base fd))
171 (edge-change nil))
172 (if current-entry
173 (progn
174 (setf edge-change (calc-possible-edge-change-when-removing
175 current-entry (event-type event)))
176 (%remove-event event-base event)
177 (if (fd-entry-empty-p current-entry)
178 (unmonitor-fd (mux-of event-base) current-entry)
179 (when edge-change
180 (setf (fd-entry-edge-change current-entry) edge-change)
181 (update-fd (mux-of event-base) current-entry)
182 (setf (fd-entry-edge-change current-entry) nil))))
183 (%remove-event event-base event)))
184 (values event-base))
186 (defmacro with-fd-handler ((event-base fd event-type function
187 &optional timeout)
188 &body body)
189 (once-only (event-base)
190 (with-gensyms (event)
191 `(let (,event)
192 (unwind-protect
193 (progn
194 (setf ,event (add-fd ,event-base ,fd ,event-type ,function
195 :persistent t
196 :timeout ,timeout))
197 ,@body)
198 (when ,event
199 (remove-event ,event-base ,event)))))))
201 (defmethod event-dispatch :around ((event-base event-base) &key timeout only-once)
202 (setf (exit-p event-base) nil)
203 (when timeout (exit-event-loop event-base :delay timeout))
204 (call-next-method event-base :only-once only-once))
206 (defun recalculate-timeouts (timeouts)
207 (let ((now (gettime)))
208 (dolist (ev (queue-head timeouts))
209 (event-recalc-abs-timeout ev now))))
211 (defun dispatch-timeouts (dispatch-list)
212 (dolist (ev dispatch-list)
213 (funcall (event-handler ev) nil :timeout)))
215 (defmethod remove-events ((event-base event-base) event-list)
216 (dolist (ev event-list)
217 (remove-event event-base ev)))
219 (defmethod event-dispatch ((event-base event-base) &key only-once)
220 (with-accessors ((mux mux-of) (fds fds-of)
221 (exit-p exit-p) (exit-when-empty exit-when-empty-p)
222 (timeouts timeouts-of)) event-base
223 (flet ((recalc-poll-timeout ()
224 (calc-min-timeout (events-calc-min-rel-timeout timeouts)
225 *maximum-event-loop-timeout*)))
226 (do ((poll-timeout (recalc-poll-timeout) (recalc-poll-timeout))
227 (deletion-list () ())
228 (dispatch-list () ()))
229 ((or exit-p (and exit-when-empty (event-base-empty-p event-base))))
230 (recalculate-timeouts timeouts)
231 (when (dispatch-fd-events-once event-base poll-timeout)
232 (and only-once (setf exit-p t)))
233 (setf (values deletion-list dispatch-list)
234 (filter-expired-events (expired-events timeouts (gettime))))
235 (dispatch-timeouts dispatch-list)
236 (remove-events event-base deletion-list)
237 (queue-sort timeouts #'< #'event-abs-timeout)))))
239 (defun dispatch-fd-events-once (event-base timeout)
240 "Waits for events and dispatches them. Returns T if some events have been received, NIL otherwise."
241 (with-accessors ((mux mux-of) (fds fds-of)
242 (timeouts timeouts-of)) event-base
243 (let ((deletion-list ())
244 (fd-events (harvest-events mux timeout)))
245 (dolist (ev fd-events)
246 (destructuring-bind (fd ev-types) ev
247 (let ((fd-entry (fd-entry-of event-base fd)))
248 (if fd-entry
249 (let ((errorp (member :error ev-types)))
250 (when errorp
251 (dispatch-error-events fd-entry)
252 (nconcf deletion-list
253 (fd-entry-all-events fd-entry)))
254 (when (member :read ev-types)
255 (dispatch-read-events fd-entry)
256 (or errorp (nconcf deletion-list
257 (fd-entry-one-shot-events fd-entry :read))))
258 (when (member :write ev-types)
259 (dispatch-write-events fd-entry)
260 (or errorp (nconcf deletion-list
261 (fd-entry-one-shot-events fd-entry :write)))))
262 (warn "Got spurious event for non-monitored FD: ~A" fd)))))
263 (dolist (ev deletion-list)
264 (remove-event event-base ev))
265 (consp fd-events))))
267 (defun expired-events (queue now)
268 (queue-filter queue
269 #'(lambda (to)
270 (and to (<= to now)))
271 #'event-abs-timeout))
273 (defun filter-expired-events (events)
274 (let ((deletion-list ())
275 (dispatch-list ()))
276 (dolist (ev events)
277 (push ev dispatch-list)
278 (unless (event-persistent-p ev)
279 (push ev deletion-list)))
280 (values deletion-list dispatch-list)))
282 (defun events-calc-min-rel-timeout (timeouts)
283 (let* ((now (gettime))
284 (first-valid-event (find-if #'(lambda (to)
285 (or (null to) (< now to)))
286 (queue-head timeouts)
287 :key #'event-abs-timeout)))
288 (when (and first-valid-event
289 (event-abs-timeout first-valid-event))
290 (- (event-abs-timeout first-valid-event) now))))
292 (defun dispatch-error-events (fd-entry)
293 (dolist (ev (queue-head (fd-entry-error-events fd-entry)))
294 (funcall (event-handler ev) (fd-entry-fd fd-entry) :error)))
296 (defun dispatch-read-events (fd-entry)
297 (dolist (ev (queue-head (fd-entry-read-events fd-entry)))
298 (funcall (event-handler ev) (fd-entry-fd fd-entry) :read)))
300 (defun dispatch-write-events (fd-entry)
301 (dolist (ev (queue-head (fd-entry-write-events fd-entry)))
302 (funcall (event-handler ev) (fd-entry-fd fd-entry) :write)))
305 ;;; FD-Entry
307 (deftype fd-event ()
308 '(member :read :write :error))
310 (deftype event-type ()
311 '(or fd-event (member :timeout)))
313 (defstruct (fd-entry
314 (:constructor make-fd-entry (fd))
315 (:copier nil))
316 (fd 0 :type unsigned-byte)
317 (edge-change nil :type symbol)
318 (read-events (make-queue) :type queue)
319 (write-events (make-queue) :type queue)
320 (error-events (make-queue) :type queue))
322 (defun fd-entry-event-list (fd-entry event-type)
323 (check-type fd-entry fd-entry)
324 (check-type event-type fd-event)
325 (case event-type
326 (:read (fd-entry-read-events fd-entry))
327 (:write (fd-entry-write-events fd-entry))
328 (:error (fd-entry-error-events fd-entry))))
330 (defun (setf fd-entry-event-list) (fd-entry event-list event-type)
331 (check-type fd-entry fd-entry)
332 (check-type event-type fd-event)
333 (case event-type
334 (:read (setf (fd-entry-read-events fd-entry) event-list))
335 (:write (setf (fd-entry-write-events fd-entry) event-list))
336 (:error (setf (fd-entry-error-events fd-entry) event-list))))
338 (defun fd-entry-empty-p (fd-entry)
339 (and (queue-empty-p (fd-entry-read-events fd-entry))
340 (queue-empty-p (fd-entry-write-events fd-entry))
341 (queue-empty-p (fd-entry-error-events fd-entry))))
343 (defun fd-entry-add-event (fd-entry event)
344 (queue-enqueue (fd-entry-event-list fd-entry (event-type event))
345 event))
347 (defun fd-entry-del-event (fd-entry event)
348 (queue-delete (fd-entry-event-list fd-entry (event-type event))
349 event))
351 (defun fd-entry-all-events (fd-entry)
352 (append (queue-head (fd-entry-read-events fd-entry))
353 (queue-head (fd-entry-write-events fd-entry))
354 (queue-head (fd-entry-error-events fd-entry))))
356 (defun fd-entry-one-shot-events (fd-entry event-type)
357 (remove-if #'event-persistent-p
358 (queue-head (fd-entry-event-list fd-entry event-type))))
361 ;;; Event
363 (defstruct (event
364 (:constructor make-event (fd type handler persistent-p
365 abs-timeout timeout))
366 (:copier nil))
367 ;; a file descriptor or nil in case of a timeout
368 (fd nil :type (or null unsigned-byte))
369 (type nil :type (or null event-type))
370 (handler nil :type (or null function))
371 ;; if an event is not persistent it is removed
372 ;; after it occurs or if it times out
373 (persistent-p nil :type boolean)
374 (abs-timeout nil :type (or null timeout))
375 (timeout nil :type (or null timeout)))
377 (defun event-timed-out-p (event timeout)
378 (let ((ev-to (event-abs-timeout event)))
379 (when (and ev-to timeout)
380 (< timeout ev-to))))
382 (defun event-recalc-abs-timeout (event now)
383 (setf (event-abs-timeout event)
384 (+ now (event-timeout event))))
387 ;;; Multiplexer
390 (defun get-fd-limit ()
391 (let ((fd-limit (et:get-resource-limit et:rlimit-nofile)))
392 (unless (eql fd-limit et:rlim-infinity)
393 (1- fd-limit))))
395 (defclass multiplexer ()
396 ((fd :reader fd-of)
397 (fd-limit :initform (get-fd-limit)
398 :initarg :fd-limit
399 :reader fd-limit-of)))
401 (defgeneric monitor-fd (mux fd-entry)
402 (:method ((mux multiplexer) fd-entry)
403 (declare (ignore fd-entry))
406 (defgeneric update-fd (mux fd-entry)
407 (:method ((mux multiplexer) fd-entry)
408 (declare (ignore fd-entry))
411 (defgeneric unmonitor-fd (mux fd-entry)
412 (:method ((mux multiplexer) fd-entry)
413 (declare (ignore fd-entry))
416 (defgeneric harvest-events (mux timeout))
418 (defgeneric close-multiplexer (mux)
419 (:method-combination progn :most-specific-last)
420 (:method progn ((mux multiplexer))
421 (when (slot-value mux 'fd)
422 (et:close (fd-of mux))
423 (setf (slot-value mux 'fd) nil))
424 mux))
426 (defmethod monitor-fd :around ((mux multiplexer) fd-entry)
427 (if (ignore-and-print-errors (call-next-method))
429 (warn "FD monitoring failed for FD ~A."
430 (fd-entry-fd fd-entry))))
432 (defmethod update-fd :around ((mux multiplexer) fd-entry)
433 (if (ignore-and-print-errors (call-next-method))
435 (warn "FD status update failed for FD ~A."
436 (fd-entry-fd fd-entry))))
438 (defmethod unmonitor-fd :around ((mux multiplexer) fd-entry)
439 (if (ignore-and-print-errors (call-next-method))
441 (warn "FD unmonitoring failed for FD ~A."
442 (fd-entry-fd fd-entry))))
444 (defmacro define-multiplexer (name priority superclasses slots &rest options)
445 `(progn
446 (defclass ,name ,superclasses ,slots ,@options)
447 (pushnew (cons ,priority ',name)
448 *available-multiplexers*)))
450 ;;;;
451 ;;;; Misc
452 ;;;;
454 ;; FIXME: Until a way to autodetect platform features is implemented
455 (define-constant et::pollrdhup 0)
457 (defun wait-until-fd-ready (fd event-type &optional timeout)
458 ;; FIXME: this conses badly for its return value
459 ;; solution: (1) use a fixnum bitmap, just like C
460 ;; (2) if we really want to expose only lists of keyword as the API,
461 ;; cache a bitmap-indexed vector of all the combinations (sharing tails)
462 (flet ((choose-poll-flags (type)
463 (ecase type
464 (:read (logior et:pollin et::pollrdhup et:pollpri))
465 (:write (logior et:pollout et:pollhup))
466 (:read-write (logior et:pollin et::pollrdhup et:pollpri
467 et:pollout et:pollhup)))))
468 (let ((status ()))
469 (with-foreign-object (pollfd 'et:pollfd)
470 (et:bzero pollfd et:size-of-pollfd)
471 (with-foreign-slots ((et:fd et:events et:revents) pollfd et:pollfd)
472 (setf et:fd fd
473 et:events (choose-poll-flags event-type))
474 (handler-case
475 (let ((ret (et:repeat-upon-condition-decreasing-timeout
476 ((et:eintr) tmp-timeout (timeout->milisec timeout))
477 (et:poll pollfd 1 tmp-timeout))))
478 (when (zerop ret)
479 (return-from wait-until-fd-ready '(:timeout))))
480 (et:unix-error ()
481 (return-from wait-until-fd-ready '(:error))))
482 (flags-case et:revents
483 ((et:pollout et:pollhup) (push :write status))
484 ((et:pollin et::pollrdhup et:pollpri) (push :read status))
485 ((et:pollerr et:pollnval) (push :error status)))
486 (return-from wait-until-fd-ready status))))))
488 (defun fd-ready-p (fd &optional (event-type :read))
489 (not (member :timeout (wait-until-fd-ready fd event-type 0) :test #'eq)))