Split common.lisp into multiple files.
[iolib.git] / io-multiplex / event-loop.lisp
blobb66ea5cde829993ff76b022068cdd0dec38f0e4f
1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Indent-tabs-mode: NIL -*-
2 ;;;
3 ;;; event-loop.lisp --- Main event loop.
4 ;;;
5 ;;; Copyright (C) 2006-2007, Stelian Ionescu <sionescu@common-lisp.net>
6 ;;;
7 ;;; This code is free software; you can redistribute it and/or
8 ;;; modify it under the terms of the version 2.1 of
9 ;;; the GNU Lesser General Public License as published by
10 ;;; the Free Software Foundation, as clarified by the
11 ;;; preamble found here:
12 ;;; http://opensource.franz.com/preamble.html
13 ;;;
14 ;;; This program is distributed in the hope that it will be useful,
15 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
16 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 ;;; GNU General Public License for more details.
18 ;;;
19 ;;; You should have received a copy of the GNU Lesser General
20 ;;; Public License along with this library; if not, write to the
21 ;;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 ;;; Boston, MA 02110-1301, USA
24 (in-package :io.multiplex)
27 ;;;; EVENT-BASE
29 (defclass event-base ()
30 ((mux :initform (make-instance *default-multiplexer*)
31 :initarg :mux :reader mux-of)
32 (fds :initform (make-hash-table :test 'eql)
33 :reader fds-of)
34 (timeouts :initform (make-queue)
35 :reader timeouts-of)
36 (exit :initform nil
37 :accessor exit-p)
38 (exit-when-empty :initarg :exit-when-empty
39 :accessor exit-when-empty-p))
40 (:default-initargs :exit-when-empty nil)
41 (:documentation "An event base ..."))
43 (defmacro with-event-base ((var &rest initargs) &body body)
44 "Binds VAR to a new EVENT-BASE, instantiated with INITARGS,
45 within the extent of BODY. Closes VAR."
46 `(let ((,var (make-instance 'event-base ,@initargs)))
47 (unwind-protect
48 (progn ,@body)
49 (close ,var))))
51 (defmethod print-object ((base event-base) stream)
52 (print-unreadable-object (base stream :type nil :identity t)
53 (format stream "event base, ~A FDs monitored, using: ~A"
54 ;; kludge: quick fix for printing closed event bases
55 (when (fds-of base) (hash-table-count (fds-of base)))
56 (mux-of base))))
58 (defmethod initialize-instance :after ((base event-base) &key)
59 (with-slots (mux) base
60 (when (symbolp mux)
61 (setf mux (make-instance mux)))))
63 ;;; KLUDGE: CLOSE is for streams. --luis
64 ;;;
65 ;;; Also, we might want to close FDs here. Or have a version/argument
66 ;;; that handles that. Or... add finalizers to the fd streams.
67 (defmethod close ((event-base event-base) &key abort)
68 (declare (ignore abort))
69 (with-accessors ((mux mux-of)) event-base
70 (close-multiplexer mux)
71 (dolist (slot '(fds timeouts exit))
72 (setf (slot-value event-base slot) nil))
73 (values event-base)))
75 (defgeneric add-fd (base fd event-type function &key timeout one-shot)
76 (:documentation ""))
78 (defgeneric add-timeout (event-base function timeout &key persistent)
79 (:documentation ""))
81 (defgeneric remove-event (event-base event)
82 (:documentation ""))
84 (defgeneric remove-events (event-base event-list)
85 (:documentation ""))
87 (defgeneric event-dispatch (event-base &key timeout one-shot &allow-other-keys)
88 (:documentation ""))
90 (defgeneric exit-event-loop (event-base &key delay)
91 (:documentation "")
92 (:method ((event-base event-base) &key (delay 0))
93 (add-timeout event-base
94 #'(lambda (fd event-type)
95 (declare (ignore fd event-type))
96 (setf (exit-p event-base) t))
97 delay :persistent nil)))
99 (defgeneric event-base-empty-p (event-base)
100 (:documentation "Return T if no FD event or timeout is registered with EVENT-BASE.")
101 (:method ((event-base event-base))
102 (and (zerop (hash-table-count (fds-of event-base)))
103 (queue-empty-p (timeouts-of event-base)))))
105 (defgeneric fd-entry-of (event-base fd)
106 (:documentation "Return the FD-ENTRY of FD in EVENT-BASE.")
107 (:method ((event-base event-base) fd)
108 (gethash fd (fds-of event-base))))
110 (defun %add-event (event-base event &optional fd-entry)
111 (with-accessors ((fds fds-of) (timeouts timeouts-of)) event-base
112 (when (event-timeout event)
113 ;; add the event to the timeout queue
114 (queue-sorted-insert timeouts event #'< #'event-abs-timeout))
115 (let ((fd (event-fd event)))
116 ;; if it's an FD event add it to its fd-entry int the FDs hash-table
117 ;; if there's no such fd-entry, create it
118 (when fd
119 (fd-entry-add-event fd-entry event)
120 (setf (gethash fd fds) fd-entry))
121 (values event))))
123 (defun %remove-event (event-base event)
124 (with-accessors ((fds fds-of) (timeouts timeouts-of)) event-base
125 (when (event-timeout event)
126 ;; remove the event from the timeout queue
127 (queue-delete timeouts event))
128 (let ((fd (event-fd event)))
129 ;; if it's an FD event remove it from its fd-entry
130 ;; if the fd-emtry is then empty, remove it
131 (when fd
132 (let ((fd-entry (gethash fd fds)))
133 (assert fd-entry)
134 (fd-entry-del-event fd-entry event)
135 (when (fd-entry-empty-p fd-entry)
136 (remhash fd fds))))
137 (values event))))
139 (defun calc-possible-edge-change-when-adding (fd-entry event-type)
140 (cond ((and (eql event-type :read)
141 (queue-empty-p (fd-entry-read-events fd-entry)))
142 :read-add)
143 ((and (eql event-type :write)
144 (queue-empty-p (fd-entry-write-events fd-entry)))
145 :write-add)))
147 (defmethod add-fd ((event-base event-base) fd event-type function
148 &key timeout one-shot)
149 (check-type fd unsigned-byte)
150 (check-type event-type fd-event)
151 (let ((fd-limit (fd-limit-of (mux-of event-base))))
152 (when (and fd-limit (> fd fd-limit))
153 (error "Cannot add such a large FD: ~A" fd)))
154 (let ((current-entry (fd-entry-of event-base fd))
155 (event (make-event fd event-type function (not one-shot)
156 (abs-timeout timeout)
157 (normalize-timeout timeout)))
158 (edge-change nil))
159 (if current-entry
160 (progn
161 (setf edge-change (calc-possible-edge-change-when-adding
162 current-entry event-type))
163 (%add-event event-base event current-entry)
164 (when edge-change
165 (setf (fd-entry-edge-change current-entry) edge-change)
166 (update-fd (mux-of event-base) current-entry)
167 (setf (fd-entry-edge-change current-entry) nil)))
168 (progn
169 (setf current-entry (make-fd-entry fd))
170 (%add-event event-base event current-entry)
171 (unless (monitor-fd (mux-of event-base) current-entry)
172 (%remove-event event-base event))))
173 (values event)))
175 (defmethod add-timeout ((event-base event-base) function timeout
176 &key persistent)
177 (assert timeout)
178 (%add-event event-base
179 (make-event nil :timeout function persistent
180 (abs-timeout timeout) (normalize-timeout timeout))))
182 (defun calc-possible-edge-change-when-removing (fd-entry event-type)
183 (cond ((and (eql event-type :read)
184 (not (queue-empty-p (fd-entry-read-events fd-entry))))
185 :read-del)
186 ((and (eql event-type :write)
187 (not (queue-empty-p (fd-entry-write-events fd-entry))))
188 :write-del)))
190 (defmethod remove-event ((event-base event-base) event)
191 (check-type (event-type event) event-type)
192 (let* ((fd (event-fd event))
193 (current-entry (fd-entry-of event-base fd))
194 (edge-change nil))
195 (if current-entry
196 (progn
197 (setf edge-change (calc-possible-edge-change-when-removing
198 current-entry (event-type event)))
199 (%remove-event event-base event)
200 (if (fd-entry-empty-p current-entry)
201 (unmonitor-fd (mux-of event-base) current-entry)
202 (when edge-change
203 (setf (fd-entry-edge-change current-entry) edge-change)
204 (update-fd (mux-of event-base) current-entry)
205 (setf (fd-entry-edge-change current-entry) nil))))
206 (%remove-event event-base event)))
207 (values event-base))
209 (defmacro with-fd-handler ((event-base fd event-type function &optional timeout)
210 &body body)
212 (once-only (event-base)
213 (with-unique-names (event)
214 `(let (,event)
215 (unwind-protect
216 (progn
217 (setf ,event (add-fd ,event-base ,fd ,event-type ,function
218 :timeout ,timeout))
219 ,@body)
220 (when ,event
221 (remove-event ,event-base ,event)))))))
223 (defmethod event-dispatch :around ((event-base event-base)
224 &key timeout one-shot)
225 (setf (exit-p event-base) nil)
226 (when timeout
227 (exit-event-loop event-base :delay timeout))
228 (call-next-method event-base :one-shot one-shot))
230 (defun recalculate-timeouts (timeouts)
231 (let ((now (osicat:get-monotonic-time)))
232 (dolist (ev (queue-head timeouts))
233 (event-recalc-abs-timeout ev now))))
235 (defun dispatch-timeouts (dispatch-list)
236 (dolist (ev dispatch-list)
237 (funcall (event-handler ev) nil :timeout)))
239 (defmethod remove-events ((event-base event-base) event-list)
240 (dolist (ev event-list)
241 (remove-event event-base ev)))
243 (defvar *maximum-event-loop-timeout* 1)
245 (defmethod event-dispatch ((event-base event-base) &key one-shot)
246 (with-accessors ((mux mux-of) (fds fds-of)
247 (exit-p exit-p) (exit-when-empty exit-when-empty-p)
248 (timeouts timeouts-of)) event-base
249 (flet ((recalc-poll-timeout ()
250 (calc-min-timeout (events-calc-min-rel-timeout timeouts)
251 *maximum-event-loop-timeout*)))
252 (do ((poll-timeout (recalc-poll-timeout) (recalc-poll-timeout))
253 (deletion-list () ())
254 (dispatch-list () ()))
255 ((or exit-p (and exit-when-empty (event-base-empty-p event-base))))
256 (recalculate-timeouts timeouts)
257 (when (and (dispatch-fd-events-once event-base poll-timeout) one-shot)
258 (setf exit-p t))
259 (setf (values deletion-list dispatch-list)
260 (filter-expired-events
261 (expired-events timeouts (osicat:get-monotonic-time))))
262 (when (and dispatch-list one-shot)
263 (setf exit-p t))
264 (dispatch-timeouts dispatch-list)
265 (remove-events event-base deletion-list)
266 (queue-sort timeouts #'< #'event-abs-timeout)))))
268 ;;; Waits for events and dispatches them. Returns T if some events
269 ;;; have been received, NIL otherwise.
270 (defun dispatch-fd-events-once (event-base timeout)
271 (with-accessors ((mux mux-of) (fds fds-of) (timeouts timeouts-of))
272 event-base
273 (let ((deletion-list ())
274 (fd-events (harvest-events mux timeout)))
275 (dolist (ev fd-events)
276 (destructuring-bind (fd ev-types) ev
277 (let ((fd-entry (fd-entry-of event-base fd)))
278 (if fd-entry
279 (let ((errorp (member :error ev-types)))
280 (when errorp
281 (dispatch-error-events fd-entry)
282 (nconcf deletion-list
283 (fd-entry-all-events fd-entry)))
284 (when (member :read ev-types)
285 (dispatch-read-events fd-entry)
286 (or errorp
287 (nconcf deletion-list
288 (fd-entry-one-shot-events fd-entry :read))))
289 (when (member :write ev-types)
290 (dispatch-write-events fd-entry)
291 (or errorp
292 (nconcf deletion-list
293 (fd-entry-one-shot-events fd-entry :write)))))
294 (warn "Got spurious event for non-monitored FD: ~A" fd)))))
295 (dolist (ev deletion-list)
296 (remove-event event-base ev))
297 (consp fd-events))))
299 (defun expired-events (queue now)
300 (queue-filter queue
301 #'(lambda (to) (and to (<= to now)))
302 #'event-abs-timeout))
304 (defun filter-expired-events (events)
305 (let ((deletion-list ())
306 (dispatch-list ()))
307 (dolist (ev events)
308 (push ev dispatch-list)
309 (unless (event-persistent-p ev)
310 (push ev deletion-list)))
311 (values deletion-list dispatch-list)))
313 (defun events-calc-min-rel-timeout (timeouts)
314 (let* ((now (osicat:get-monotonic-time))
315 (first-valid-event (find-if #'(lambda (to)
316 (or (null to) (< now to)))
317 (queue-head timeouts)
318 :key #'event-abs-timeout)))
319 (when (and first-valid-event
320 (event-abs-timeout first-valid-event))
321 (- (event-abs-timeout first-valid-event) now))))
323 (defun dispatch-error-events (fd-entry)
324 (dolist (ev (queue-head (fd-entry-error-events fd-entry)))
325 (funcall (event-handler ev) (fd-entry-fd fd-entry) :error)))
327 (defun dispatch-read-events (fd-entry)
328 (dolist (ev (queue-head (fd-entry-read-events fd-entry)))
329 (funcall (event-handler ev) (fd-entry-fd fd-entry) :read)))
331 (defun dispatch-write-events (fd-entry)
332 (dolist (ev (queue-head (fd-entry-write-events fd-entry)))
333 (funcall (event-handler ev) (fd-entry-fd fd-entry) :write)))