1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; indent-tabs-mode: nil -*-
3 ;;; --- Device buffers.
6 (in-package :io.zeta-streams
)
8 ;;;-------------------------------------------------------------------------
10 ;;;-------------------------------------------------------------------------
15 (defclass device-buffer
(buffer)
16 ((synchronized :initarg
:synchronized
17 :reader %db-synchronized-p
)
18 (device :initarg
:device
20 (input-iobuf :initarg
:input-buffer
21 :accessor %db-input-iobuf
)
22 (output-iobuf :initarg
:output-buffer
23 :accessor %db-output-iobuf
)
24 (buffering :initarg
:buffering
25 :accessor %db-buffering
))
26 (:default-initargs
:synchronized nil
))
28 (defclass single-channel-buffer
(device-buffer)
29 ((dirtyp :initform nil
30 :accessor %scb-dirtyp
)))
32 (defclass dual-channel-buffer
(device-buffer)
35 (defclass memory-buffer
(buffer)
36 ((data-vector :accessor %mb-data-vector
)
37 (element-type :accessor %mb-element-type
)
38 (input-position :initform
0
39 :accessor %mb-input-position
)
40 (output-position :initform
0
41 :accessor %mb-output-position
)
42 (adjust-size :accessor %mb-adjust-size
)
43 (adjust-threshold :accessor %mb-adjust-threshold
)))
45 (defclass octet-memory-buffer
(memory-buffer)
47 (:default-initargs
:element-type
'octet
))
49 (defclass character-memory-buffer
(memory-buffer)
51 (:default-initargs
:element-type
'character
))
54 ;;;-------------------------------------------------------------------------
56 ;;;-------------------------------------------------------------------------
60 (defgeneric zstream-synchronized-p
(buffer))
62 (defgeneric zstream-device
(buffer))
64 (defgeneric (setf zstream-device
) (new-device buffer
))
68 (defgeneric zstream-read-element
(buffer &key timeout
))
70 (defgeneric zstream-write-element
(buffer element
&key timeout
))
72 (defgeneric zstream-read-vector
(buffer vector
&key start end timeout
))
74 (defgeneric zstream-write-vector
(buffer vector
&key start end timeout
))
76 ;;; Device buffer functions
78 (defgeneric zstream-position
(buffer &key direction
))
80 (defgeneric (setf zstream-position
) (position buffer
&key direction from
))
82 (defgeneric zstream-poll
(buffer &key direction timeout
))
84 (defgeneric zstream-fill-input
(buffer &key timeout
))
86 (defgeneric zstream-flush-output
(buffer &key timeout
))
88 (defgeneric zstream-clear-input
(buffer))
90 (defgeneric zstream-clear-output
(buffer))
92 ;;; Internal functions
94 (defgeneric %zstream-read-vector
(buffer vector start end timeout
))
96 (defgeneric %zstream-write-vector
(buffer vector start end timeout
))
98 (defgeneric %zstream-fill-input
(buffer timeout
))
100 (defgeneric %zstream-flush-output
(buffer timeout
))
102 (defgeneric %zstream-clear-input
(buffer))
104 (defgeneric %zstream-clear-output
(buffer))
106 (defgeneric %ensure-memory-buffer-capacity
(buffer &optional amount
))
108 (defgeneric %check-memory-buffer-available-data
(buffer &optional amount
))
111 ;;;-------------------------------------------------------------------------
113 ;;;-------------------------------------------------------------------------
115 (defmethod zstream-synchronized-p ((buffer device-buffer
))
116 (%db-synchronized-p buffer
))
118 (defmethod zstream-synchronized-p ((buffer memory-buffer
))
119 (declare (ignore buffer
))
122 (defmethod zstream-device ((buffer device-buffer
))
125 (defmethod zstream-device ((buffer memory-buffer
))
126 (declare (ignore buffer
))
129 (defmethod (setf zstream-device
) (new-device (buffer device-buffer
))
130 (setf (%db-device buffer
) new-device
))
132 (defmethod (setf zstream-device
) (new-device (buffer memory-buffer
))
133 (declare (ignore new-device buffer
))
137 ;;;-------------------------------------------------------------------------
139 ;;;-------------------------------------------------------------------------
141 ;; FIXME: synchronize memory buffers too ?
142 (defmacro with-synchronized-buffer
((buffer &optional direction
) &body body
)
143 (with-gensyms (body-fun)
144 (labels ((make-locks (body direction
)
148 ((iobuf-lock (%db-input-iobuf
,buffer
)))
152 ((iobuf-lock (%db-output-iobuf
,buffer
)))
155 (make-locks (make-locks body
:output
) :input
)))))
156 `(flet ((,body-fun
() ,@body
))
157 (declare (dynamic-extent #',body-fun
))
158 (if (zstream-synchronized-p ,buffer
)
159 ,(make-locks `(,body-fun
) direction
)
163 ;;;-------------------------------------------------------------------------
165 ;;;-------------------------------------------------------------------------
167 (defmethod shared-initialize :after
168 ((buffer single-channel-buffer
) slot-names
169 &key data size buffering
)
170 (declare (ignore slot-names
))
171 (with-accessors ((device zstream-device
)
172 (input-iobuf %db-input-iobuf
)
173 (output-iobuf %db-output-iobuf
))
175 (check-type device device
)
176 (check-type data
(or null iobuf
))
177 (check-type buffering stream-buffering
)
178 (setf input-iobuf
(or data
(make-iobuf size
))
179 output-iobuf input-iobuf
)))
181 (defmethod shared-initialize :after
182 ((buffer dual-channel-buffer
) slot-names
183 &key input-data output-data input-size output-size buffering
)
184 (declare (ignore slot-names
))
185 (with-accessors ((device zstream-device
)
186 (input-iobuf %db-input-iobuf
)
187 (output-iobuf %db-output-iobuf
))
189 (check-type device device
)
190 (check-type input-data
(or null iobuf
))
191 (check-type output-data
(or null iobuf
))
192 (check-type buffering stream-buffering
)
193 (setf input-iobuf
(or input-data
(make-iobuf input-size
)))
194 (setf output-iobuf
(or output-data
(make-iobuf output-size
)))))
196 (defmethod shared-initialize :after
197 ((buffer memory-buffer
) slot-names
198 &key data
(start 0) end
(element-type t
)
199 (adjust-size 1.5) (adjust-threshold 1))
200 (declare (ignore slot-names
))
201 (check-type adjust-size
(real 1.001))
202 (check-type adjust-threshold
(real 0.1 1))
203 (setf (%mb-adjust-size buffer
) adjust-size
204 (%mb-adjust-threshold buffer
) adjust-threshold
205 (%mb-element-type buffer
) (upgraded-array-element-type
209 (check-bounds data start end
)
211 ;; FIXME: signal proper condition
212 (assert (subtypep element-type
(array-element-type data
))))
213 (setf (%mb-data-vector buffer
)
214 (make-array (truncate (* adjust-size
(length data
)))
215 :element-type
(or element-type
216 (array-element-type data
))))
217 (setf (%mb-output-position buffer
) (- end start
))
218 (replace (%mb-data-vector buffer
) data
:start2 start
:end2 end
))
220 (setf (%mb-data-vector buffer
)
221 (make-array 128 :element-type element-type
)))))
224 ;;;-------------------------------------------------------------------------
226 ;;;-------------------------------------------------------------------------
228 (defmethod relinquish :after
((buffer single-channel-buffer
) &key abort
)
229 (with-synchronized-buffer (buffer :input
)
231 (%zstream-flush-output buffer
0))
232 (relinquish (zstream-device buffer
) :abort abort
))
235 (defmethod relinquish :after
((buffer dual-channel-buffer
) &key abort
)
236 (with-synchronized-buffer (buffer :io
)
238 (%zstream-flush-output buffer
0))
239 (relinquish (zstream-device buffer
) :abort abort
))
243 ;;;-------------------------------------------------------------------------
245 ;;;-------------------------------------------------------------------------
247 (defmethod zstream-read-element ((buffer device-buffer
) &key timeout
)
248 (let ((v (make-array 1 :element-type
'octet
)))
249 (declare (dynamic-extent v
))
250 (zstream-read-vector buffer v
:timeout timeout
)
253 (defmethod zstream-read-element ((buffer memory-buffer
) &key timeout
)
254 (declare (ignore timeout
))
255 (let ((v (make-array 1 :element-type
(%mb-element-type buffer
))))
256 (declare (dynamic-extent v
))
257 (zstream-read-vector buffer v
)
261 ;;;-------------------------------------------------------------------------
263 ;;;-------------------------------------------------------------------------
265 (defmethod zstream-read-vector :around
((buffer buffer
) vector
&key
266 (start 0) end timeout
)
267 (check-bounds vector start end
)
268 (when (= start end
) (return* 0))
269 (call-next-method buffer vector
:start start
:end end
:timeout timeout
))
271 (defmethod zstream-read-vector ((buffer single-channel-buffer
) vector
272 &key start end timeout
)
273 (with-synchronized-buffer (buffer :input
)
274 (%zstream-read-vector buffer vector start end timeout
)))
276 (defmethod zstream-read-vector ((buffer dual-channel-buffer
) vector
277 &key start end timeout
)
278 (with-synchronized-buffer (buffer :input
)
279 (%zstream-read-vector buffer vector start end timeout
)))
281 (defmethod %zstream-read-vector
((buffer device-buffer
) vector
283 (with-accessors ((input-iobuf %db-input-iobuf
))
286 ((iobuf-empty-p input-iobuf
)
287 (let ((nbytes (%zstream-fill-input buffer timeout
)))
288 (if (iobuf-empty-p input-iobuf
)
289 (if (eql :eof nbytes
) :eof
0)
290 (iobuf->vector input-iobuf vector start end
))))
292 (iobuf->vector input-iobuf vector start end
)))))
294 (defmethod zstream-read-vector ((buffer memory-buffer
) vector
295 &key start end timeout
)
296 (declare (ignore timeout
))
297 (with-accessors ((data-vector %mb-data-vector
)
298 (input-position %mb-input-position
)
299 (output-position %mb-output-position
))
301 (%check-memory-buffer-available-data buffer
1)
302 (replace vector data-vector
303 :start1 input-position
:end1 output-position
304 :start2 start
:end2 end
)
305 (incf input-position
(min (- output-position input-position
)
309 ;;;-------------------------------------------------------------------------
311 ;;;-------------------------------------------------------------------------
313 (defmethod zstream-write-element ((buffer device-buffer
) octet
&key timeout
)
314 (check-type octet octet
)
315 (let ((v (make-array 1 :element-type
'octet
:initial-contents octet
)))
316 (declare (dynamic-extent v
))
317 (zstream-write-vector buffer v
:timeout timeout
)))
319 (defmethod zstream-write-element ((buffer memory-buffer
) element
&key timeout
)
320 (declare (ignore timeout
))
321 (let ((v (make-array 1 :element-type
(%mb-element-type buffer
)
322 :initial-contents element
)))
323 (declare (dynamic-extent v
))
324 (zstream-write-vector buffer v
)))
327 ;;;-------------------------------------------------------------------------
329 ;;;-------------------------------------------------------------------------
331 (defmethod zstream-write-vector :around
((buffer buffer
) vector
332 &key
(start 0) end timeout
)
333 (check-bounds vector start end
)
334 (when (= start end
) (return* 0))
335 (call-next-method buffer vector
:start start
:end end
:timeout timeout
))
337 (defmethod zstream-write-vector ((buffer single-channel-buffer
) vector
338 &key start end timeout
)
339 (with-synchronized-buffer (buffer :output
)
340 ;; If the previous operation was a read, flush the read buffer
341 ;; and reposition the file offset accordingly
342 (%zstream-clear-input buffer
)
343 (%zstream-write-vector buffer vector start end timeout
)))
345 (defmethod zstream-write-vector ((buffer dual-channel-buffer
) vector
346 &key start end timeout
)
347 (with-synchronized-buffer (buffer :output
)
348 (%zstream-write-vector buffer vector start end timeout
)))
350 (defmethod %zstream-write-vector
((buffer device-buffer
) vector start end timeout
)
351 (with-accessors ((output-iobuf %db-output-iobuf
))
353 (multiple-value-prog1
354 (vector->iobuf output-iobuf vector start end
)
355 (when (iobuf-full-p output-iobuf
)
356 (%zstream-flush-output buffer timeout
)))))
358 (defmethod %zstream-write-vector
:after
((buffer single-channel-buffer
)
359 vector start end timeout
)
360 (declare (ignore vector start end timeout
))
361 (setf (%scb-dirtyp buffer
) t
))
363 (defmethod zstream-write-vector ((buffer memory-buffer
) vector
364 &key
(start 0) end timeout
)
365 (declare (ignore timeout
))
366 (with-accessors ((data-vector %mb-data-vector
)
367 (output-position %mb-output-position
))
369 (%ensure-memory-buffer-capacity buffer
(length vector
))
370 (replace data-vector vector
:start1 output-position
371 :start2 start
:end2 end
)
372 (incf output-position
(length vector
))))
375 ;;;-------------------------------------------------------------------------
377 ;;;-------------------------------------------------------------------------
379 (defmethod zstream-position ((buffer single-channel-buffer
) &key direction
)
380 (declare (ignore direction
))
381 (with-synchronized-buffer (buffer :input
)
382 (let ((position (device-position (zstream-device buffer
))))
383 ;; FIXME: signal proper condition
384 (assert (not (null position
)) (position)
385 "A single-channel-buffer's device must not return a NULL device-position.")
386 (if (%scb-dirtyp buffer
)
387 (+ position
(iobuf-available-octets (%db-output-iobuf buffer
)))
388 (- position
(iobuf-available-octets (%db-input-iobuf buffer
)))))))
390 (defmethod zstream-position ((buffer dual-channel-buffer
) &key direction
)
391 (declare (ignore direction
))
392 (with-synchronized-buffer (buffer :io
)
393 (device-position (zstream-device buffer
))))
395 (defmethod zstream-position ((buffer memory-buffer
) &key direction
)
397 (:input
(%mb-input-position buffer
))
398 (:output
(%mb-output-position buffer
))))
401 ;;;-------------------------------------------------------------------------
403 ;;;-------------------------------------------------------------------------
405 (defmethod (setf zstream-position
)
406 (position (buffer device-buffer
) &key direction
(from :start
))
407 (declare (ignore direction
))
408 (with-synchronized-buffer (buffer :input
)
409 (setf (%db-position buffer from
) position
)))
411 (defun (setf %db-position
) (position buffer from
)
412 (setf (device-position (zstream-device buffer
) from
) position
))
414 (defmethod (setf zstream-position
)
415 (offset (buffer memory-buffer
) &key direction
(from :start
))
416 (with-accessors ((data-vector %mb-data-vector
)
417 (input-position %mb-input-position
)
418 (output-position %mb-output-position
))
425 (:current
(+ input-position offset
))
426 (:output
(+ output-position offset
)))))
427 (check-bounds data-vector newpos output-position
)
428 (setf input-position newpos
)))
433 (:current
(+ output-position offset
))
434 (:input
(+ input-position offset
)))))
435 (%ensure-memory-buffer-capacity buffer
(- newpos output-position
))
436 (setf output-position newpos
))))))
439 ;;;-------------------------------------------------------------------------
441 ;;;-------------------------------------------------------------------------
443 (defmethod zstream-clear-input ((buffer device-buffer
))
444 (with-synchronized-buffer (buffer :input
)
445 (%zstream-clear-input buffer
)))
447 (defmethod %zstream-clear-input
((buffer single-channel-buffer
))
448 (unless (%scb-dirtyp buffer
)
449 (let ((nbytes (iobuf-available-octets (%db-input-iobuf buffer
))))
450 (unless (zerop nbytes
)
451 (setf (%db-position buffer
:current
) (- nbytes
)))
452 (iobuf-reset (%db-input-iobuf buffer
)))))
454 (defmethod %zstream-clear-input
((buffer dual-channel-buffer
))
455 (iobuf-reset (%db-input-iobuf buffer
)))
457 (defmethod zstream-clear-input ((buffer memory-buffer
))
458 (setf (%mb-input-position buffer
) (%mb-output-position buffer
)))
461 ;;;-------------------------------------------------------------------------
463 ;;;-------------------------------------------------------------------------
465 (defmethod zstream-clear-output ((buffer device-buffer
))
466 (with-synchronized-buffer (buffer :output
)
467 (%zstream-clear-output buffer
)))
469 (defmethod %zstream-clear-output
((buffer single-channel-buffer
))
470 (when (%scb-dirtyp buffer
)
471 (iobuf-reset (%db-output-iobuf buffer
))))
473 (defmethod %zstream-clear-output
((buffer dual-channel-buffer
))
474 (iobuf-reset (%db-output-iobuf buffer
)))
476 (defmethod zstream-clear-output ((buffer memory-buffer
))
477 (setf (%mb-output-position buffer
) (%mb-input-position buffer
)))
480 ;;;-------------------------------------------------------------------------
482 ;;;-------------------------------------------------------------------------
484 (defmethod zstream-fill-input ((buffer single-channel-buffer
) &key timeout
)
485 (with-synchronized-buffer (buffer :input
)
486 (%zstream-flush-output buffer timeout
)
487 (%zstream-fill-input buffer timeout
)))
489 (defmethod zstream-fill-input ((buffer dual-channel-buffer
) &key timeout
)
490 (with-synchronized-buffer (buffer :input
)
491 (%zstream-fill-input buffer timeout
)))
493 (defmethod %zstream-fill-input
((buffer device-buffer
) timeout
)
494 (with-accessors ((device zstream-device
)
495 (input-iobuf %db-input-iobuf
))
497 (multiple-value-bind (data start end
)
498 (iobuf-next-empty-zone input-iobuf
)
500 (device-read device data
:start start
501 :end end
:timeout timeout
)))
504 (error 'end-of-file
:stream buffer
))
506 (setf (iobuf-end input-iobuf
) (+ start nbytes
))
507 (values nbytes
(iobuf-available-space input-iobuf
))))))))
509 (defmethod zstream-fill-input ((buffer memory-buffer
) &key timeout
)
510 (declare (ignore buffer timeout
))
514 ;;;-------------------------------------------------------------------------
516 ;;;-------------------------------------------------------------------------
518 (defmethod zstream-flush-output ((buffer device-buffer
) &key timeout
)
519 (with-synchronized-buffer (buffer :output
)
520 (%zstream-flush-output buffer timeout
)))
522 (defmethod %zstream-flush-output
((buffer device-buffer
) timeout
)
523 (with-accessors ((device zstream-device
)
524 (output-iobuf %db-output-iobuf
))
526 (when (%scb-dirtyp buffer
)
527 (multiple-value-bind (data start end
)
528 (iobuf-next-data-zone output-iobuf
)
530 (device-write device data
:start start
531 :end end
:timeout timeout
)))
534 (error 'hangup
:stream buffer
))
536 (setf (iobuf-start output-iobuf
) (+ start nbytes
))
537 (values nbytes
(iobuf-available-octets output-iobuf
)))))))))
539 (defmethod %zstream-flush-output
:after
((buffer single-channel-buffer
) timeout
)
540 (declare (ignore timeout
))
541 (when (iobuf-empty-p (%db-output-iobuf buffer
))
542 (setf (%scb-dirtyp buffer
) nil
)))
544 (defmethod zstream-flush-output ((buffer memory-buffer
) &key timeout
)
545 (declare (ignore buffer timeout
))
549 ;;;-------------------------------------------------------------------------
550 ;;; MEMORY-BUFFER GROW
551 ;;;-------------------------------------------------------------------------
553 (defmethod %ensure-memory-buffer-capacity
((buffer memory-buffer
) &optional
(amount 1))
554 (check-type amount unsigned-byte
)
555 (with-accessors ((data-vector %mb-data-vector
)
556 (output-position %mb-output-position
)
557 (adjust-size %mb-adjust-size
)
558 (adjust-threshold %mb-adjust-threshold
))
560 (let* ((size-needed (+ output-position amount
))
561 (threshold (ceiling (* adjust-threshold size-needed
))))
562 (when (> threshold
(length data-vector
))
564 (adjust-array data-vector
565 (truncate (* adjust-size size-needed
))))))))
567 (defmethod %check-memory-buffer-available-data
((buffer memory-buffer
) &optional
(amount 1))
568 (check-type amount positive-integer
)
569 (with-accessors ((input-position %mb-input-position
)
570 (output-position %mb-output-position
))
572 (let ((available-data (- output-position input-position
)))
573 (check-type available-data unsigned-byte
)
575 ((zerop available-data
)
576 (error 'end-of-file
:stream buffer
))
577 ((< available-data amount
)
578 ;; FIXME: signal proper condition, soft EOF
579 (error "~S elements requested, only ~S available"
580 amount available-data
))))))
583 ;;;-------------------------------------------------------------------------
585 ;;;-------------------------------------------------------------------------
587 (defmethod zstream-poll ((buffer device-buffer
) &key direction timeout
)
588 (device-poll (zstream-device buffer
) direction timeout
))
590 (defmethod zstream-poll ((buffer memory-buffer
) &key direction timeout
)
591 (declare (ignore timeout
))
593 (:input
(< (%mb-input-position buffer
)
594 (%mb-output-position buffer
)))