Rewrite buffering.
[iolib.git] / io.streams / zeta / buffer.lisp
blobf65d6c438e72565a5e9d6ab5ff94a3883e153803
1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; indent-tabs-mode: nil -*-
2 ;;;
3 ;;; --- Device buffers.
4 ;;;
6 (in-package :io.zeta-streams)
8 ;;;-----------------------------------------------------------------------------
9 ;;; Buffer Classes and Types
10 ;;;-----------------------------------------------------------------------------
12 (defclass buffer ()
13 ((synchronized :initarg :synchronized :reader buffer-synchronized-p)
14 (input-iobuf :initarg :input-buffer :accessor input-iobuf-of)
15 (output-iobuf :initarg :output-buffer :accessor output-iobuf-of))
16 (:default-initargs :synchronized nil))
18 (defclass single-channel-buffer (single-channel-device buffer)
19 ((last-io-op :initform nil :accessor last-io-op-of)))
21 (defclass dual-channel-buffer (dual-channel-device buffer) ())
24 ;;;-----------------------------------------------------------------------------
25 ;;; Buffer Generic Functions
26 ;;;-----------------------------------------------------------------------------
28 (defgeneric buffer-clear-input (buffer))
30 (defgeneric buffer-clear-output (buffer))
32 (defgeneric buffer-fill-input (buffer &optional timeout))
34 (defgeneric buffer-flush-output (buffer &optional timeout))
36 ;;; Internal functions
38 (defgeneric buffer-read-octets (buffer vector start end timeout))
40 (defgeneric buffer-write-octets (buffer vector start end timeout))
42 (defgeneric %buffer-clear-input (buffer))
44 (defgeneric %buffer-fill-input (buffer timeout))
46 (defgeneric %buffer-flush-output (buffer timeout))
49 ;;;-----------------------------------------------------------------------------
50 ;;; Helper macros
51 ;;;-----------------------------------------------------------------------------
53 (defmacro with-synchronized-single-channel-buffer ((buffer) &body body)
54 (with-gensyms (body-fun)
55 `(flet ((,body-fun () ,@body))
56 (if (buffer-synchronized-p ,buffer)
57 (bt:with-lock-held ((iobuf-lock (input-iobuf-of ,buffer)))
58 (,body-fun))
59 (,body-fun)))))
61 (defmacro with-synchronized-dual-channel-buffer ((buffer &optional direction)
62 &body body)
63 (with-gensyms (body-fun)
64 (labels ((make-locks (body direction)
65 (case direction
66 (:input
67 `(bt:with-lock-held ((iobuf-lock (input-iobuf-of ,buffer)))
68 ,body))
69 (:output
70 `(bt:with-lock-held ((iobuf-lock (output-iobuf-of ,buffer)))
71 ,body))
72 (otherwise
73 (make-locks (make-locks body :output) :input)))))
74 `(flet ((,body-fun () ,@body))
75 (if (buffer-synchronized-p ,buffer)
76 ,(make-locks `(,body-fun) direction)
77 (,body-fun))))))
80 ;;;-----------------------------------------------------------------------------
81 ;;; Buffer Constructors
82 ;;;-----------------------------------------------------------------------------
84 (defmethod initialize-instance :after
85 ((device single-channel-buffer) &key buffer buffer-size)
86 (with-accessors ((input-iobuf input-iobuf-of)
87 (output-iobuf output-iobuf-of))
88 device
89 (check-type buffer (or null iobuf))
90 (setf input-iobuf (or buffer (make-iobuf buffer-size))
91 output-iobuf input-iobuf)))
93 (defmethod initialize-instance :after
94 ((device dual-channel-buffer) &key input-buffer output-buffer
95 input-buffer-size output-buffer-size)
96 (with-accessors ((input-iobuf input-iobuf-of)
97 (output-iobuf output-iobuf-of))
98 device
99 (check-type input-buffer (or null iobuf))
100 (check-type output-buffer (or null iobuf))
101 (setf input-iobuf (or input-buffer (make-iobuf input-buffer-size)))
102 (setf output-iobuf (or output-buffer (make-iobuf output-buffer-size)))))
105 ;;;-----------------------------------------------------------------------------
106 ;;; Buffer DEVICE-CLOSE
107 ;;;-----------------------------------------------------------------------------
109 (defmethod device-close ((buffer single-channel-buffer) &optional abort)
110 (with-accessors ((handle input-handle-of))
111 buffer
112 (with-synchronized-single-channel-buffer (buffer)
113 (unless (or abort (eql :read (last-io-op-of buffer)))
114 (%buffer-flush-output buffer 0))
115 (device-close handle)))
116 (values buffer))
118 (defmethod device-close ((buffer buffer) &optional abort)
119 (with-accessors ((input-handle input-handle-of buffer)
120 (output-handle output-handle-of buffer))
121 buffer
122 (with-synchronized-dual-channel-buffer (buffer)
123 (unless abort
124 (%buffer-flush-output buffer 0))
125 (device-close input-handle)
126 (device-close output-handle)))
127 (values buffer))
129 ;;;-----------------------------------------------------------------------------
130 ;;; Buffer DEVICE-READ
131 ;;;-----------------------------------------------------------------------------
133 (defmethod device-read ((buffer single-channel-buffer) vector start end
134 &optional timeout)
135 (when (= start end) (return-from device-read 0))
136 (with-synchronized-single-channel-buffer (buffer)
137 ;; If the previous operation was a write, try to flush the output buffer.
138 ;; If the buffer couldn't be flushed entirely, signal an error
139 (synchronize-input buffer)
140 (buffer-read-octets buffer buffer start end timeout)))
142 (defmethod device-read ((buffer dual-channel-buffer) vector start end
143 &optional timeout)
144 (when (= start end) (return-from device-read 0))
145 (with-synchronized-dual-channel-buffer (buffer :input)
146 (buffer-read-octets buffer buffer start end timeout)))
148 (defmethod buffer-read-octets ((buffer buffer) vector start end timeout)
149 (with-accessors ((input-handle input-handle-of)
150 (input-iobuf input-iobuf-of)
151 (output-handle output-handle-of)
152 (output-iobuf output-iobuf-of))
153 buffer
154 (cond
155 ((iobuf-empty-p input-iobuf)
156 (let ((nbytes
157 (%buffer-fill-input buffer timeout)))
158 (if (iobuf-empty-p input-iobuf)
159 (if (eql :eof nbytes) :eof 0)
160 (iobuf->vector input-iobuf vector start end))))
162 (iobuf->vector input-iobuf vector start end)))))
165 ;;;-----------------------------------------------------------------------------
166 ;;; Buffer DEVICE-WRITE
167 ;;;-----------------------------------------------------------------------------
169 (defmethod device-write ((buffer single-channel-buffer) vector start end
170 &optional timeout)
171 (when (= start end) (return-from device-write 0))
172 (with-synchronized-single-channel-buffer (buffer)
173 ;; If the previous operation was a read, flush the read buffer
174 ;; and reposition the file offset accordingly
175 (%buffer-clear-input buffer)
176 (buffer-write-octets buffer vector start end timeout)))
178 (defmethod device-write ((buffer dual-channel-buffer) vector start end
179 &optional timeout)
180 (when (= start end) (return-from device-write 0))
181 (with-synchronized-dual-channel-buffer (buffer :output)
182 (buffer-write-octets buffer vector start end timeout)))
184 (defmethod buffer-write-octets ((buffer buffer) vector start end timeout)
185 (with-accessors ((output-handle output-handle-of)
186 (output-iobuf output-iobuf-of))
187 buffer
188 (prog1
189 (vector->iobuf output-iobuf vector start end)
190 (setf (last-io-op-of buffer) :write)
191 (when (iobuf-full-p output-iobuf)
192 (%buffer-flush-output buffer timeout)))))
195 ;;;-----------------------------------------------------------------------------
196 ;;; Buffer DEVICE-POSITION
197 ;;;-----------------------------------------------------------------------------
199 (defmethod device-position ((buffer single-channel-buffer))
200 (with-synchronized-single-channel-buffer (buffer)
201 (%buffer-position buffer)))
203 (defun %buffer-position (buffer)
204 (let ((position (device-position (input-handle-of buffer))))
205 (ecase (last-io-op-of buffer)
206 (:read
207 (- position (iobuf-available-octets (input-iobuf-of buffer))))
208 (:write
209 (+ position (iobuf-available-octets (output-iobuf-of buffer)))))))
211 (defmethod (setf device-position) (position (buffer single-channel-buffer) &key (from :start))
212 (setf (%buffer-position buffer from) position))
214 (defun (setf %buffer-position) (position buffer from)
215 (setf (device-position (input-handle-of buffer) :from from) position))
218 ;;;-----------------------------------------------------------------------------
219 ;;; BUFFER CLEAR-INPUT
220 ;;;-----------------------------------------------------------------------------
222 (defmethod buffer-clear-input ((buffer single-channel-buffer))
223 (with-synchronized-single-channel-buffer (buffer)
224 (%buffer-clear-input buffer)))
226 (defmethod %buffer-clear-input ((buffer single-channel-buffer))
227 (when (eql :read (last-io-op-of buffer))
228 (let ((nbytes (iobuf-available-octets (input-iobuf-of buffer))))
229 (unless (zerop nbytes)
230 (setf (%buffer-position buffer :current) (- nbytes)))
231 (iobuf-reset (input-iobuf-of buffer)))))
233 (defmethod buffer-clear-input ((buffer buffer))
234 (with-synchronized-dual-channel-buffer (buffer :input)
235 (%buffer-clear-input buffer)))
237 (defmethod %buffer-clear-input ((buffer dual-channel-buffer))
238 (iobuf-reset (input-iobuf-of buffer)))
241 ;;;-----------------------------------------------------------------------------
242 ;;; BUFFER CLEAR-OUTPUT
243 ;;;-----------------------------------------------------------------------------
245 (defmethod buffer-clear-output ((buffer single-channel-buffer))
246 (with-synchronized-single-channel-buffer (buffer)
247 (when (eql :write (last-io-op-of buffer))
248 (iobuf-reset (output-iobuf-of buffer)))))
250 (defmethod buffer-clear-output ((buffer dual-channel-buffer))
251 (with-synchronized-dual-channel-buffer (buffer :output)
252 (iobuf-reset (output-iobuf-of buffer))))
255 ;;;-----------------------------------------------------------------------------
256 ;;; BUFFER FILL-INPUT
257 ;;;-----------------------------------------------------------------------------
259 (defmethod buffer-fill-input ((buffer single-channel-buffer) &optional timeout)
260 (with-synchronized-single-channel-buffer (buffer)
261 ;; If the previous operation was a write, try to flush the output buffer.
262 ;; If the buffer couldn't be flushed entirely, signal an error
263 (synchronize-input buffer)
264 (%buffer-fill-input buffer timeout)))
266 (defun synchronize-input (buffer)
267 (when (and (eql :write (last-io-op-of buffer))
268 (plusp (%buffer-flush-output buffer 0)))
269 ;; FIXME: What do we do now ???
270 (error "Could not flush the entire write buffer !"))
271 (iobuf-reset (output-iobuf-of buffer)))
273 (defmethod buffer-fill-input ((buffer dual-channel-buffer) &optional timeout)
274 (with-synchronized-dual-channel-buffer (buffer :input)
275 (%buffer-fill-input buffer timeout)))
277 (defmethod %buffer-fill-input ((buffer buffer) timeout)
278 (with-accessors ((input-handle input-handle-of)
279 (input-iobuf input-iobuf-of))
280 buffer
281 (multiple-value-bind (data start end)
282 (iobuf-next-empty-zone input-iobuf)
283 (let ((nbytes
284 (device-read input-handle data start end timeout)))
285 (setf (iobuf-end input-iobuf) (+ start nbytes))
286 (setf (last-io-op-of buffer) :read)
287 (values nbytes)))))
290 ;;;-----------------------------------------------------------------------------
291 ;;; BUFFER FLUSH-OUTPUT
292 ;;;-----------------------------------------------------------------------------
294 (defmethod buffer-flush-output ((buffer single-channel-buffer) &optional timeout)
295 (with-synchronized-single-channel-buffer (buffer)
296 (when (eql :write (last-io-op-of buffer))
297 (%buffer-flush-output buffer timeout))))
299 (defmethod buffer-flush-output ((buffer dual-channel-buffer) &optional timeout)
300 (with-synchronized-dual-channel-buffer (buffer :output)
301 (%buffer-flush-output buffer timeout)))
303 (defmethod %buffer-flush-output ((buffer dual-channel-buffer) timeout)
304 (with-accessors ((output-handle output-handle-of)
305 (output-iobuf output-iobuf-of))
306 buffer
307 (multiple-value-bind (data start end)
308 (iobuf-next-data-zone output-iobuf)
309 (let ((nbytes
310 (device-write output-handle data start end timeout)))
311 (setf (iobuf-start output-iobuf) (+ start nbytes))
312 (setf (last-io-op-of buffer) :write)
313 (iobuf-available-octets output-iobuf)))))