1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; indent-tabs-mode: nil -*-
6 (in-package :io.zeta-streams
)
8 ;;;-------------------------------------------------------------------------
10 ;;;-------------------------------------------------------------------------
15 (defclass device-buffer
(buffer)
16 ((synchronized :initarg
:synchronized
17 :reader %db-synchronized-p
)
18 (device :initarg
:device
20 (input-iobuf :initarg
:input-buffer
21 :accessor %db-input-iobuf
)
22 (output-iobuf :initarg
:output-buffer
23 :accessor %db-output-iobuf
)
24 (buffering :initarg
:buffering
25 :accessor %db-buffering
))
26 (:default-initargs
:synchronized nil
))
28 (defclass memory-buffer
(buffer)
29 ((data-vector :accessor %mb-data-vector
)
30 (element-type :accessor %mb-element-type
)
31 (input-position :initform
0
32 :accessor %mb-input-position
)
33 (output-position :initform
0
34 :accessor %mb-output-position
)
35 (adjust-size :accessor %mb-adjust-size
)
36 (adjust-threshold :accessor %mb-adjust-threshold
)))
39 ((external-format :accessor %zs-external-format
)))
41 (defclass device-zstream
(device-buffer zstream
)
44 (defclass single-channel-zstream
(device-zstream)
45 ((dirtyp :initform nil
46 :accessor %sczs-dirtyp
)))
48 (defclass dual-channel-zstream
(device-zstream)
51 (defclass memory-zstream
(memory-buffer zstream
)
54 (defclass octet-memory-zstream
(memory-zstream)
56 (:default-initargs
:element-type
'octet
))
58 (defclass character-memory-zstream
(memory-zstream)
60 (:default-initargs
:element-type
'character
))
63 ;;;-------------------------------------------------------------------------
65 ;;;-------------------------------------------------------------------------
69 (defgeneric zstream-synchronized-p
(stream))
71 (defgeneric zstream-device
(stream))
73 (defgeneric (setf zstream-device
) (new-device stream
))
75 (defgeneric zstream-external-format
(stream))
77 (defgeneric (setf zstream-external-format
) (external-format stream
))
81 (defgeneric zstream-read-element
(stream &key timeout
))
83 (defgeneric zstream-write-element
(stream element
&key timeout
))
85 (defgeneric zstream-read-vector
(stream vector
&key start end timeout
))
87 (defgeneric zstream-write-vector
(stream vector
&key start end timeout
))
89 (defgeneric zstream-read-byte
(stream &key width signed
))
91 (defgeneric zstream-write-byte
(stream byte
&key width signed
))
93 (defgeneric zstream-read-char
(stream &key eof-error-p eof-value
))
95 (defgeneric zstream-write-char
(stream char
&key hangup-error-p hangup-value
))
97 (defgeneric zstream-read-line
(stream &key eof-error-p eof-value
))
99 (defgeneric zstream-write-line
(stream line
&key start end hangup-error-p hangup-value
))
101 ;;; Device zstream functions
103 (defgeneric zstream-position
(stream &key direction
))
105 (defgeneric (setf zstream-position
) (position stream
&key direction from
))
107 (defgeneric zstream-poll
(stream &key direction timeout
))
109 (defgeneric zstream-fill
(stream &key timeout
))
111 (defgeneric zstream-flush
(stream &key timeout
))
113 (defgeneric zstream-clear-input
(stream))
115 (defgeneric zstream-clear-output
(stream))
117 ;;; Internal functions
119 (defgeneric %zstream-read-vector
(stream vector start end timeout
))
121 (defgeneric %zstream-write-vector
(stream vector start end timeout
))
123 (defgeneric %zstream-fill
(stream timeout
))
125 (defgeneric %zstream-flush
(stream timeout
))
127 (defgeneric %zstream-clear-input
(stream))
129 (defgeneric %zstream-clear-output
(stream))
131 ;; FIXME: choose better name
132 (defgeneric %ensure-buffer-capacity
(stream &optional amount
))
134 ;; FIXME: choose better name
135 (defgeneric %check-buffer-available-data
(stream &optional amount
))
138 ;;;-------------------------------------------------------------------------
140 ;;;-------------------------------------------------------------------------
142 (defmethod zstream-synchronized-p ((stream device-zstream
))
143 (%db-synchronized-p stream
))
145 (defmethod zstream-synchronized-p ((stream memory-zstream
))
146 (declare (ignore stream
))
149 (defmethod zstream-device ((stream device-zstream
))
152 (defmethod zstream-device ((stream memory-zstream
))
153 (declare (ignore stream
))
156 (defmethod (setf zstream-device
) (new-device (stream device-zstream
))
157 (setf (%db-device stream
) new-device
))
159 (defmethod (setf zstream-device
) (new-device (stream memory-zstream
))
160 (declare (ignore new-device stream
))
163 (defmethod zstream-external-format ((stream zstream
))
164 (%zs-external-format stream
))
166 (defmethod (setf zstream-external-format
)
167 (external-format (stream zstream
))
168 (setf (%zs-external-format stream
)
169 (babel:ensure-external-format external-format
)))
172 ;;;-------------------------------------------------------------------------
174 ;;;-------------------------------------------------------------------------
176 (defmethod shared-initialize :after
177 ((stream single-channel-zstream
) slot-names
178 &key data size buffering
)
179 (declare (ignore slot-names
))
180 (with-accessors ((device zstream-device
)
181 (input-iobuf %db-input-iobuf
)
182 (output-iobuf %db-output-iobuf
))
184 (check-type device device
)
185 (check-type data
(or null iobuf
))
186 (check-type buffering stream-buffering
)
187 (setf input-iobuf
(or data
(make-iobuf size
))
188 output-iobuf input-iobuf
)))
190 (defmethod shared-initialize :after
191 ((stream dual-channel-zstream
) slot-names
192 &key input-data output-data input-size output-size buffering
)
193 (declare (ignore slot-names
))
194 (with-accessors ((device zstream-device
)
195 (input-iobuf %db-input-iobuf
)
196 (output-iobuf %db-output-iobuf
))
198 (check-type device device
)
199 (check-type input-data
(or null iobuf
))
200 (check-type output-data
(or null iobuf
))
201 (check-type buffering stream-buffering
)
202 (setf input-iobuf
(or input-data
(make-iobuf input-size
)))
203 (setf output-iobuf
(or output-data
(make-iobuf output-size
)))))
205 (defmethod shared-initialize :after
206 ((stream memory-zstream
) slot-names
207 &key data
(start 0) end
(element-type t
)
208 (adjust-size 1.5) (adjust-threshold 1))
209 (declare (ignore slot-names
))
210 (check-type adjust-size
(real 1.001))
211 (check-type adjust-threshold
(real 0.1 1))
212 (setf (%mb-adjust-size stream
) adjust-size
213 (%mb-adjust-threshold stream
) adjust-threshold
214 (%mb-element-type stream
) (upgraded-array-element-type
218 (check-bounds data start end
)
220 ;; FIXME: signal proper condition
221 (assert (subtypep element-type
(array-element-type data
))))
222 (setf (%mb-data-vector stream
)
223 (make-array (truncate (* adjust-size
(length data
)))
224 :element-type
(or element-type
225 (array-element-type data
))))
226 (setf (%mb-output-position stream
) (- end start
))
227 (replace (%mb-data-vector stream
) data
:start2 start
:end2 end
))
229 (setf (%mb-data-vector stream
)
230 (make-array 128 :element-type element-type
)))))
232 (defmethod shared-initialize :after
((stream zstream
) slot-names
233 &key
(external-format :default
))
234 (declare (ignore slot-names
))
235 (setf (zstream-external-format stream
) external-format
))
237 (defun make-memory-zstream (&key data
(start 0) end
(element-type t
)
238 (adjust-size 1.5) (adjust-threshold 1))
239 (let ((element-type (upgraded-array-element-type element-type
)))
241 ((subtypep element-type
'octet
)
242 (make-instance 'octet-memory-zstream
243 :data data
:start start
:end end
244 :adjust-size adjust-size
245 :adjust-threshold adjust-threshold
))
246 ((subtypep element-type
'character
)
247 (make-instance 'character-memory-zstream
248 :data data
:start start
:end end
249 :adjust-size adjust-size
250 :adjust-threshold adjust-threshold
))
251 ((subtypep element-type
't
)
252 (make-instance 'memory-zstream
253 :data data
:start start
:end end
254 :element-type element-type
255 :adjust-size adjust-size
256 :adjust-threshold adjust-threshold
))
258 (error 'subtype-error
:datum element-type
259 :expected-supertype
'(or (unsigned-byte 8) character t
))))))
262 ;;;-------------------------------------------------------------------------
264 ;;;-------------------------------------------------------------------------
266 ;; FIXME: synchronize memory streams too ?
267 (defmacro with-synchronized-device-zstream
268 ((stream &optional direction
) &body body
)
269 (with-gensyms (body-fun)
270 (labels ((make-locks (body direction
)
274 ((iobuf-lock (%db-input-iobuf
,stream
)))
278 ((iobuf-lock (%db-output-iobuf
,stream
)))
281 (make-locks (make-locks body
:output
) :input
)))))
282 `(flet ((,body-fun
() ,@body
))
283 (declare (dynamic-extent #',body-fun
))
284 (if (zstream-synchronized-p ,stream
)
285 ,(make-locks `(,body-fun
) direction
)
289 ;;;-------------------------------------------------------------------------
291 ;;;-------------------------------------------------------------------------
293 (defmethod relinquish :after
((stream single-channel-zstream
) &key abort
)
294 (with-synchronized-device-zstream (stream :input
)
296 (%zstream-flush stream
0))
297 (relinquish (zstream-device stream
) :abort abort
))
300 (defmethod relinquish :after
((stream dual-channel-zstream
) &key abort
)
301 (with-synchronized-device-zstream (stream :io
)
303 (%zstream-flush stream
0))
304 (relinquish (zstream-device stream
) :abort abort
))
308 ;;;-------------------------------------------------------------------------
310 ;;;-------------------------------------------------------------------------
312 (defmethod zstream-read-element ((stream device-zstream
) &key timeout
)
313 (let ((v (make-array 1 :element-type
'octet
)))
314 (declare (dynamic-extent v
))
315 (zstream-read-vector stream v
:timeout timeout
)
318 (defmethod zstream-read-element ((stream memory-zstream
) &key timeout
)
319 (declare (ignore timeout
))
320 (let ((v (make-array 1 :element-type
(%mb-element-type stream
))))
321 (declare (dynamic-extent v
))
322 (zstream-read-vector stream v
)
326 ;;;-------------------------------------------------------------------------
328 ;;;-------------------------------------------------------------------------
330 (defmethod zstream-read-vector :around
((stream zstream
) vector
&key
331 (start 0) end timeout
)
332 (check-bounds vector start end
)
333 (when (= start end
) (return* 0))
334 (call-next-method stream vector
:start start
:end end
:timeout timeout
))
336 (defmethod zstream-read-vector ((stream single-channel-zstream
) vector
337 &key start end timeout
)
338 (with-synchronized-device-zstream (stream :input
)
339 (%zstream-read-vector stream vector start end timeout
)))
341 (defmethod zstream-read-vector ((stream dual-channel-zstream
) vector
342 &key start end timeout
)
343 (with-synchronized-device-zstream (stream :input
)
344 (%zstream-read-vector stream vector start end timeout
)))
346 (defmethod %zstream-read-vector
((stream device-zstream
) vector
348 (with-accessors ((input-iobuf %db-input-iobuf
))
351 ((iobuf-empty-p input-iobuf
)
352 (let ((nbytes (%zstream-fill stream timeout
)))
353 (if (iobuf-empty-p input-iobuf
)
354 (if (eql :eof nbytes
) :eof
0)
355 (iobuf->vector input-iobuf vector start end
))))
357 (iobuf->vector input-iobuf vector start end
)))))
359 (defmethod zstream-read-vector ((stream memory-zstream
) vector
360 &key start end timeout
)
361 (declare (ignore timeout
))
362 (with-accessors ((data-vector %mb-data-vector
)
363 (input-position %mb-input-position
)
364 (output-position %mb-output-position
))
366 (%check-buffer-available-data stream
1)
367 (replace vector data-vector
368 :start1 input-position
:end1 output-position
369 :start2 start
:end2 end
)
370 (incf input-position
(min (- output-position input-position
)
374 ;;;-------------------------------------------------------------------------
376 ;;;-------------------------------------------------------------------------
378 (defmethod zstream-write-element ((stream device-zstream
) octet
&key timeout
)
379 (check-type octet octet
)
380 (let ((v (make-array 1 :element-type
'octet
:initial-contents octet
)))
381 (declare (dynamic-extent v
))
382 (zstream-write-vector stream v
:timeout timeout
)))
384 (defmethod zstream-write-element ((stream memory-zstream
) element
&key timeout
)
385 (declare (ignore timeout
))
386 (let ((v (make-array 1 :element-type
(%mb-element-type stream
)
387 :initial-contents element
)))
388 (declare (dynamic-extent v
))
389 (zstream-write-vector stream v
)))
392 ;;;-------------------------------------------------------------------------
394 ;;;-------------------------------------------------------------------------
396 (defmethod zstream-write-vector :around
((stream zstream
) vector
397 &key
(start 0) end timeout
)
398 (check-bounds vector start end
)
399 (when (= start end
) (return* 0))
400 (call-next-method stream vector
:start start
:end end
:timeout timeout
))
402 (defmethod zstream-write-vector ((stream single-channel-zstream
) vector
403 &key start end timeout
)
404 (with-synchronized-device-zstream (stream :output
)
405 ;; If the previous operation was a read, flush the read buffer
406 ;; and reposition the file offset accordingly
407 (%zstream-clear-input stream
)
408 (%zstream-write-vector stream vector start end timeout
)))
410 (defmethod zstream-write-vector ((stream dual-channel-zstream
) vector
411 &key start end timeout
)
412 (with-synchronized-device-zstream (stream :output
)
413 (%zstream-write-vector stream vector start end timeout
)))
415 (defmethod %zstream-write-vector
((stream device-zstream
) vector start end timeout
)
416 (with-accessors ((output-iobuf %db-output-iobuf
))
418 (multiple-value-prog1
419 (vector->iobuf output-iobuf vector start end
)
420 (when (iobuf-full-p output-iobuf
)
421 (%zstream-flush stream timeout
)))))
423 (defmethod %zstream-write-vector
:after
((stream single-channel-zstream
)
424 vector start end timeout
)
425 (declare (ignore vector start end timeout
))
426 (setf (%sczs-dirtyp stream
) t
))
428 (defmethod zstream-write-vector ((stream memory-zstream
) vector
429 &key
(start 0) end timeout
)
430 (declare (ignore timeout
))
431 (with-accessors ((data-vector %mb-data-vector
)
432 (output-position %mb-output-position
))
434 (%ensure-buffer-capacity stream
(length vector
))
435 (replace data-vector vector
:start1 output-position
436 :start2 start
:end2 end
)
437 (incf output-position
(length vector
))))
440 ;;;-------------------------------------------------------------------------
442 ;;;-------------------------------------------------------------------------
444 (defmethod zstream-position ((stream single-channel-zstream
) &key direction
)
445 (declare (ignore direction
))
446 (with-synchronized-device-zstream (stream :input
)
447 (let ((position (device-position (zstream-device stream
))))
448 ;; FIXME: signal proper condition
449 (assert (not (null position
)) (position)
450 "A single-channel-zstream's device must not return a NULL device-position.")
451 (if (%sczs-dirtyp stream
)
452 (+ position
(iobuf-available-octets (%db-output-iobuf stream
)))
453 (- position
(iobuf-available-octets (%db-input-iobuf stream
)))))))
455 (defmethod zstream-position ((stream dual-channel-zstream
) &key direction
)
456 (declare (ignore direction
))
457 (with-synchronized-device-zstream (stream :io
)
458 (device-position (zstream-device stream
))))
460 (defmethod zstream-position ((stream memory-zstream
) &key direction
)
462 (:input
(%mb-input-position stream
))
463 (:output
(%mb-output-position stream
))))
466 ;;;-------------------------------------------------------------------------
468 ;;;-------------------------------------------------------------------------
470 (defmethod (setf zstream-position
)
471 (position (stream device-zstream
) &key direction
(from :start
))
472 (declare (ignore direction
))
473 (with-synchronized-device-zstream (stream :input
)
474 (setf (%db-position stream from
) position
)))
476 (defun (setf %db-position
) (position stream from
)
477 (setf (device-position (zstream-device stream
) from
) position
))
479 (defmethod (setf zstream-position
)
480 (offset (stream memory-zstream
) &key direction
(from :start
))
481 (with-accessors ((data-vector %mb-data-vector
)
482 (input-position %mb-input-position
)
483 (output-position %mb-output-position
))
490 (:current
(+ input-position offset
))
491 (:output
(+ output-position offset
)))))
492 (check-bounds data-vector newpos output-position
)
493 (setf input-position newpos
)))
498 (:current
(+ output-position offset
))
499 (:input
(+ input-position offset
)))))
500 (%ensure-buffer-capacity stream
(- newpos output-position
))
501 (setf output-position newpos
))))))
504 ;;;-------------------------------------------------------------------------
506 ;;;-------------------------------------------------------------------------
508 (defmethod zstream-clear-input ((stream device-zstream
))
509 (with-synchronized-device-zstream (stream :input
)
510 (%zstream-clear-input stream
)))
512 (defmethod %zstream-clear-input
((stream single-channel-zstream
))
513 (unless (%sczs-dirtyp stream
)
514 (let ((nbytes (iobuf-available-octets (%db-input-iobuf stream
))))
515 (unless (zerop nbytes
)
516 (setf (%db-position stream
:current
) (- nbytes
)))
517 (iobuf-reset (%db-input-iobuf stream
)))))
519 (defmethod %zstream-clear-input
((stream dual-channel-zstream
))
520 (iobuf-reset (%db-input-iobuf stream
)))
522 (defmethod zstream-clear-input ((stream memory-zstream
))
523 (setf (%mb-input-position stream
) (%mb-output-position stream
)))
526 ;;;-------------------------------------------------------------------------
528 ;;;-------------------------------------------------------------------------
530 (defmethod zstream-clear-output ((stream device-zstream
))
531 (with-synchronized-device-zstream (stream :output
)
532 (%zstream-clear-output stream
)))
534 (defmethod %zstream-clear-output
((stream single-channel-zstream
))
535 (when (%sczs-dirtyp stream
)
536 (iobuf-reset (%db-output-iobuf stream
))))
538 (defmethod %zstream-clear-output
((stream dual-channel-zstream
))
539 (iobuf-reset (%db-output-iobuf stream
)))
541 (defmethod zstream-clear-output ((stream memory-zstream
))
542 (setf (%mb-output-position stream
) (%mb-input-position stream
)))
545 ;;;-------------------------------------------------------------------------
547 ;;;-------------------------------------------------------------------------
549 (defmethod zstream-fill ((stream single-channel-zstream
) &key timeout
)
550 (with-synchronized-device-zstream (stream :input
)
551 (%zstream-flush stream timeout
)
552 (%zstream-fill stream timeout
)))
554 (defmethod zstream-fill ((stream dual-channel-zstream
) &key timeout
)
555 (with-synchronized-device-zstream (stream :input
)
556 (%zstream-fill stream timeout
)))
558 (defmethod %zstream-fill
((stream device-zstream
) timeout
)
559 (with-accessors ((device zstream-device
)
560 (input-iobuf %db-input-iobuf
))
562 (multiple-value-bind (data start end
)
563 (iobuf-next-empty-zone input-iobuf
)
565 (device-read device data
:start start
566 :end end
:timeout timeout
)))
569 (error 'end-of-file
:stream stream
))
571 (setf (iobuf-end input-iobuf
) (+ start nbytes
))
572 (values nbytes
(iobuf-available-space input-iobuf
))))))))
574 (defmethod zstream-fill ((stream memory-zstream
) &key timeout
)
575 (declare (ignore stream timeout
))
579 ;;;-------------------------------------------------------------------------
581 ;;;-------------------------------------------------------------------------
583 (defmethod zstream-flush ((stream device-zstream
) &key timeout
)
584 (with-synchronized-device-zstream (stream :output
)
585 (%zstream-flush stream timeout
)))
587 (defmethod %zstream-flush
((stream device-zstream
) timeout
)
588 (with-accessors ((device zstream-device
)
589 (output-iobuf %db-output-iobuf
))
591 (when (%sczs-dirtyp stream
)
592 (multiple-value-bind (data start end
)
593 (iobuf-next-data-zone output-iobuf
)
595 (device-write device data
:start start
596 :end end
:timeout timeout
)))
599 (error 'hangup
:stream stream
))
601 (setf (iobuf-start output-iobuf
) (+ start nbytes
))
602 (values nbytes
(iobuf-available-octets output-iobuf
)))))))))
604 (defmethod %zstream-flush
:after
((stream single-channel-zstream
) timeout
)
605 (declare (ignore timeout
))
606 (when (iobuf-empty-p (%db-output-iobuf stream
))
607 (setf (%sczs-dirtyp stream
) nil
)))
609 (defmethod zstream-flush ((stream memory-zstream
) &key timeout
)
610 (declare (ignore stream timeout
))
614 ;;;-------------------------------------------------------------------------
615 ;;; MEMORY-ZSTREAM GROW
616 ;;;-------------------------------------------------------------------------
618 (defmethod %ensure-buffer-capacity
619 ((stream memory-zstream
) &optional
(amount 1))
620 (check-type amount unsigned-byte
)
621 (with-accessors ((data-vector %mb-data-vector
)
622 (output-position %mb-output-position
)
623 (adjust-size %mb-adjust-size
)
624 (adjust-threshold %mb-adjust-threshold
))
626 (let* ((size-needed (+ output-position amount
))
627 (threshold (ceiling (* adjust-threshold size-needed
))))
628 (when (> threshold
(length data-vector
))
630 (adjust-array data-vector
631 (truncate (* adjust-size size-needed
))))))))
633 (defmethod %check-buffer-available-data
634 ((stream memory-zstream
) &optional
(amount 1))
635 (check-type amount positive-integer
)
636 (with-accessors ((input-position %mb-input-position
)
637 (output-position %mb-output-position
))
639 (let ((available-data (- output-position input-position
)))
640 (check-type available-data unsigned-byte
)
642 ((zerop available-data
)
643 (error 'end-of-file
:stream stream
))
644 ((< available-data amount
)
645 ;; FIXME: signal proper condition, soft EOF
646 (error "~S elements requested, only ~S available"
647 amount available-data
))))))
650 ;;;-------------------------------------------------------------------------
652 ;;;-------------------------------------------------------------------------
654 (defmethod zstream-poll ((stream device-zstream
) &key direction timeout
)
655 (device-poll (zstream-device stream
) direction timeout
))
657 (defmethod zstream-poll ((stream memory-zstream
) &key direction timeout
)
658 (declare (ignore timeout
))
660 (:input
(< (%mb-input-position stream
)
661 (%mb-output-position stream
)))