Remove a few buffer classes, misc cleanup.
[iolib.git] / io.streams / zeta / stream.lisp
blobd6a5875475dc7611686eee4dea9d0338b569ce2b
1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; indent-tabs-mode: nil -*-
2 ;;;
3 ;;; --- Zeta Streams.
4 ;;;
6 (in-package :io.zeta-streams)
8 ;;;-------------------------------------------------------------------------
9 ;;; Classes and Types
10 ;;;-------------------------------------------------------------------------
12 (defclass buffer ()
13 ())
15 (defclass device-buffer (buffer)
16 ((synchronized :initarg :synchronized
17 :reader %db-synchronized-p)
18 (device :initarg :device
19 :accessor %db-device)
20 (input-iobuf :initarg :input-buffer
21 :accessor %db-input-iobuf)
22 (output-iobuf :initarg :output-buffer
23 :accessor %db-output-iobuf)
24 (buffering :initarg :buffering
25 :accessor %db-buffering))
26 (:default-initargs :synchronized nil))
28 (defclass memory-buffer (buffer)
29 ((data-vector :accessor %mb-data-vector)
30 (element-type :accessor %mb-element-type)
31 (input-position :initform 0
32 :accessor %mb-input-position)
33 (output-position :initform 0
34 :accessor %mb-output-position)
35 (adjust-size :accessor %mb-adjust-size)
36 (adjust-threshold :accessor %mb-adjust-threshold)))
38 (defclass zstream ()
39 ((external-format :accessor %zs-external-format)))
41 (defclass device-zstream (device-buffer zstream)
42 ())
44 (defclass single-channel-zstream (device-zstream)
45 ((dirtyp :initform nil
46 :accessor %sczs-dirtyp)))
48 (defclass dual-channel-zstream (device-zstream)
49 ())
51 (defclass memory-zstream (memory-buffer zstream)
52 ())
54 (defclass octet-memory-zstream (memory-zstream)
56 (:default-initargs :element-type 'octet))
58 (defclass character-memory-zstream (memory-zstream)
60 (:default-initargs :element-type 'character))
63 ;;;-------------------------------------------------------------------------
64 ;;; Generic Functions
65 ;;;-------------------------------------------------------------------------
67 ;;; Accessors
69 (defgeneric zstream-synchronized-p (stream))
71 (defgeneric zstream-device (stream))
73 (defgeneric (setf zstream-device) (new-device stream))
75 (defgeneric zstream-external-format (stream))
77 (defgeneric (setf zstream-external-format) (external-format stream))
79 ;;; I/O functions
81 (defgeneric zstream-read-element (stream &key timeout))
83 (defgeneric zstream-write-element (stream element &key timeout))
85 (defgeneric zstream-read-vector (stream vector &key start end timeout))
87 (defgeneric zstream-write-vector (stream vector &key start end timeout))
89 (defgeneric zstream-read-byte (stream &key width signed))
91 (defgeneric zstream-write-byte (stream byte &key width signed))
93 (defgeneric zstream-read-char (stream &key eof-error-p eof-value))
95 (defgeneric zstream-write-char (stream char &key hangup-error-p hangup-value))
97 (defgeneric zstream-read-line (stream &key eof-error-p eof-value))
99 (defgeneric zstream-write-line (stream line &key start end hangup-error-p hangup-value))
101 ;;; Device zstream functions
103 (defgeneric zstream-position (stream &key direction))
105 (defgeneric (setf zstream-position) (position stream &key direction from))
107 (defgeneric zstream-poll (stream &key direction timeout))
109 (defgeneric zstream-fill (stream &key timeout))
111 (defgeneric zstream-flush (stream &key timeout))
113 (defgeneric zstream-clear-input (stream))
115 (defgeneric zstream-clear-output (stream))
117 ;;; Internal functions
119 (defgeneric %zstream-read-vector (stream vector start end timeout))
121 (defgeneric %zstream-write-vector (stream vector start end timeout))
123 (defgeneric %zstream-fill (stream timeout))
125 (defgeneric %zstream-flush (stream timeout))
127 (defgeneric %zstream-clear-input (stream))
129 (defgeneric %zstream-clear-output (stream))
131 ;; FIXME: choose better name
132 (defgeneric %ensure-buffer-capacity (stream &optional amount))
134 ;; FIXME: choose better name
135 (defgeneric %check-buffer-available-data (stream &optional amount))
138 ;;;-------------------------------------------------------------------------
139 ;;; Accessors
140 ;;;-------------------------------------------------------------------------
142 (defmethod zstream-synchronized-p ((stream device-zstream))
143 (%db-synchronized-p stream))
145 (defmethod zstream-synchronized-p ((stream memory-zstream))
146 (declare (ignore stream))
147 (values nil))
149 (defmethod zstream-device ((stream device-zstream))
150 (%db-device stream))
152 (defmethod zstream-device ((stream memory-zstream))
153 (declare (ignore stream))
154 (values nil))
156 (defmethod (setf zstream-device) (new-device (stream device-zstream))
157 (setf (%db-device stream) new-device))
159 (defmethod (setf zstream-device) (new-device (stream memory-zstream))
160 (declare (ignore new-device stream))
161 (values nil))
163 (defmethod zstream-external-format ((stream zstream))
164 (%zs-external-format stream))
166 (defmethod (setf zstream-external-format)
167 (external-format (stream zstream))
168 (setf (%zs-external-format stream)
169 (babel:ensure-external-format external-format)))
172 ;;;-------------------------------------------------------------------------
173 ;;; Constructors
174 ;;;-------------------------------------------------------------------------
176 (defmethod shared-initialize :after
177 ((stream single-channel-zstream) slot-names
178 &key data size buffering)
179 (declare (ignore slot-names))
180 (with-accessors ((device zstream-device)
181 (input-iobuf %db-input-iobuf)
182 (output-iobuf %db-output-iobuf))
183 stream
184 (check-type device device)
185 (check-type data (or null iobuf))
186 (check-type buffering stream-buffering)
187 (setf input-iobuf (or data (make-iobuf size))
188 output-iobuf input-iobuf)))
190 (defmethod shared-initialize :after
191 ((stream dual-channel-zstream) slot-names
192 &key input-data output-data input-size output-size buffering)
193 (declare (ignore slot-names))
194 (with-accessors ((device zstream-device)
195 (input-iobuf %db-input-iobuf)
196 (output-iobuf %db-output-iobuf))
197 stream
198 (check-type device device)
199 (check-type input-data (or null iobuf))
200 (check-type output-data (or null iobuf))
201 (check-type buffering stream-buffering)
202 (setf input-iobuf (or input-data (make-iobuf input-size)))
203 (setf output-iobuf (or output-data (make-iobuf output-size)))))
205 (defmethod shared-initialize :after
206 ((stream memory-zstream) slot-names
207 &key data (start 0) end (element-type t)
208 (adjust-size 1.5) (adjust-threshold 1))
209 (declare (ignore slot-names))
210 (check-type adjust-size (real 1.001))
211 (check-type adjust-threshold (real 0.1 1))
212 (setf (%mb-adjust-size stream) adjust-size
213 (%mb-adjust-threshold stream) adjust-threshold
214 (%mb-element-type stream) (upgraded-array-element-type
215 element-type))
216 (cond
217 (data
218 (check-bounds data start end)
219 (when element-type
220 ;; FIXME: signal proper condition
221 (assert (subtypep element-type (array-element-type data))))
222 (setf (%mb-data-vector stream)
223 (make-array (truncate (* adjust-size (length data)))
224 :element-type (or element-type
225 (array-element-type data))))
226 (setf (%mb-output-position stream) (- end start))
227 (replace (%mb-data-vector stream) data :start2 start :end2 end))
229 (setf (%mb-data-vector stream)
230 (make-array 128 :element-type element-type)))))
232 (defmethod shared-initialize :after ((stream zstream) slot-names
233 &key (external-format :default))
234 (declare (ignore slot-names))
235 (setf (zstream-external-format stream) external-format))
237 (defun make-memory-zstream (&key data (start 0) end (element-type t)
238 (adjust-size 1.5) (adjust-threshold 1))
239 (let ((element-type (upgraded-array-element-type element-type)))
240 (cond
241 ((subtypep element-type 'octet)
242 (make-instance 'octet-memory-zstream
243 :data data :start start :end end
244 :adjust-size adjust-size
245 :adjust-threshold adjust-threshold))
246 ((subtypep element-type 'character)
247 (make-instance 'character-memory-zstream
248 :data data :start start :end end
249 :adjust-size adjust-size
250 :adjust-threshold adjust-threshold))
251 ((subtypep element-type 't)
252 (make-instance 'memory-zstream
253 :data data :start start :end end
254 :element-type element-type
255 :adjust-size adjust-size
256 :adjust-threshold adjust-threshold))
258 (error 'subtype-error :datum element-type
259 :expected-supertype '(or (unsigned-byte 8) character t))))))
262 ;;;-------------------------------------------------------------------------
263 ;;; Helper macros
264 ;;;-------------------------------------------------------------------------
266 ;; FIXME: synchronize memory streams too ?
267 (defmacro with-synchronized-device-zstream
268 ((stream &optional direction) &body body)
269 (with-gensyms (body-fun)
270 (labels ((make-locks (body direction)
271 (ecase direction
272 (:input
273 `(bt:with-lock-held
274 ((iobuf-lock (%db-input-iobuf ,stream)))
275 ,body))
276 (:output
277 `(bt:with-lock-held
278 ((iobuf-lock (%db-output-iobuf ,stream)))
279 ,body))
280 (:io
281 (make-locks (make-locks body :output) :input)))))
282 `(flet ((,body-fun () ,@body))
283 (declare (dynamic-extent #',body-fun))
284 (if (zstream-synchronized-p ,stream)
285 ,(make-locks `(,body-fun) direction)
286 (,body-fun))))))
289 ;;;-------------------------------------------------------------------------
290 ;;; RELINQUISH
291 ;;;-------------------------------------------------------------------------
293 (defmethod relinquish :after ((stream single-channel-zstream) &key abort)
294 (with-synchronized-device-zstream (stream :input)
295 (unless abort
296 (%zstream-flush stream 0))
297 (relinquish (zstream-device stream) :abort abort))
298 (values stream))
300 (defmethod relinquish :after ((stream dual-channel-zstream) &key abort)
301 (with-synchronized-device-zstream (stream :io)
302 (unless abort
303 (%zstream-flush stream 0))
304 (relinquish (zstream-device stream) :abort abort))
305 (values stream))
308 ;;;-------------------------------------------------------------------------
309 ;;; READ-ELEMENT
310 ;;;-------------------------------------------------------------------------
312 (defmethod zstream-read-element ((stream device-zstream) &key timeout)
313 (let ((v (make-array 1 :element-type 'octet)))
314 (declare (dynamic-extent v))
315 (zstream-read-vector stream v :timeout timeout)
316 (aref v 0)))
318 (defmethod zstream-read-element ((stream memory-zstream) &key timeout)
319 (declare (ignore timeout))
320 (let ((v (make-array 1 :element-type (%mb-element-type stream))))
321 (declare (dynamic-extent v))
322 (zstream-read-vector stream v)
323 (aref v 0)))
326 ;;;-------------------------------------------------------------------------
327 ;;; READ-VECTOR
328 ;;;-------------------------------------------------------------------------
330 (defmethod zstream-read-vector :around ((stream zstream) vector &key
331 (start 0) end timeout)
332 (check-bounds vector start end)
333 (when (= start end) (return* 0))
334 (call-next-method stream vector :start start :end end :timeout timeout))
336 (defmethod zstream-read-vector ((stream single-channel-zstream) vector
337 &key start end timeout)
338 (with-synchronized-device-zstream (stream :input)
339 (%zstream-read-vector stream vector start end timeout)))
341 (defmethod zstream-read-vector ((stream dual-channel-zstream) vector
342 &key start end timeout)
343 (with-synchronized-device-zstream (stream :input)
344 (%zstream-read-vector stream vector start end timeout)))
346 (defmethod %zstream-read-vector ((stream device-zstream) vector
347 start end timeout)
348 (with-accessors ((input-iobuf %db-input-iobuf))
349 stream
350 (cond
351 ((iobuf-empty-p input-iobuf)
352 (let ((nbytes (%zstream-fill stream timeout)))
353 (if (iobuf-empty-p input-iobuf)
354 (if (eql :eof nbytes) :eof 0)
355 (iobuf->vector input-iobuf vector start end))))
357 (iobuf->vector input-iobuf vector start end)))))
359 (defmethod zstream-read-vector ((stream memory-zstream) vector
360 &key start end timeout)
361 (declare (ignore timeout))
362 (with-accessors ((data-vector %mb-data-vector)
363 (input-position %mb-input-position)
364 (output-position %mb-output-position))
365 stream
366 (%check-buffer-available-data stream 1)
367 (replace vector data-vector
368 :start1 input-position :end1 output-position
369 :start2 start :end2 end)
370 (incf input-position (min (- output-position input-position)
371 (- end start)))))
374 ;;;-------------------------------------------------------------------------
375 ;;; WRITE-ELEMENT
376 ;;;-------------------------------------------------------------------------
378 (defmethod zstream-write-element ((stream device-zstream) octet &key timeout)
379 (check-type octet octet)
380 (let ((v (make-array 1 :element-type 'octet :initial-contents octet)))
381 (declare (dynamic-extent v))
382 (zstream-write-vector stream v :timeout timeout)))
384 (defmethod zstream-write-element ((stream memory-zstream) element &key timeout)
385 (declare (ignore timeout))
386 (let ((v (make-array 1 :element-type (%mb-element-type stream)
387 :initial-contents element)))
388 (declare (dynamic-extent v))
389 (zstream-write-vector stream v)))
392 ;;;-------------------------------------------------------------------------
393 ;;; WRITE-VECTOR
394 ;;;-------------------------------------------------------------------------
396 (defmethod zstream-write-vector :around ((stream zstream) vector
397 &key (start 0) end timeout)
398 (check-bounds vector start end)
399 (when (= start end) (return* 0))
400 (call-next-method stream vector :start start :end end :timeout timeout))
402 (defmethod zstream-write-vector ((stream single-channel-zstream) vector
403 &key start end timeout)
404 (with-synchronized-device-zstream (stream :output)
405 ;; If the previous operation was a read, flush the read buffer
406 ;; and reposition the file offset accordingly
407 (%zstream-clear-input stream)
408 (%zstream-write-vector stream vector start end timeout)))
410 (defmethod zstream-write-vector ((stream dual-channel-zstream) vector
411 &key start end timeout)
412 (with-synchronized-device-zstream (stream :output)
413 (%zstream-write-vector stream vector start end timeout)))
415 (defmethod %zstream-write-vector ((stream device-zstream) vector start end timeout)
416 (with-accessors ((output-iobuf %db-output-iobuf))
417 stream
418 (multiple-value-prog1
419 (vector->iobuf output-iobuf vector start end)
420 (when (iobuf-full-p output-iobuf)
421 (%zstream-flush stream timeout)))))
423 (defmethod %zstream-write-vector :after ((stream single-channel-zstream)
424 vector start end timeout)
425 (declare (ignore vector start end timeout))
426 (setf (%sczs-dirtyp stream) t))
428 (defmethod zstream-write-vector ((stream memory-zstream) vector
429 &key (start 0) end timeout)
430 (declare (ignore timeout))
431 (with-accessors ((data-vector %mb-data-vector)
432 (output-position %mb-output-position))
433 stream
434 (%ensure-buffer-capacity stream (length vector))
435 (replace data-vector vector :start1 output-position
436 :start2 start :end2 end)
437 (incf output-position (length vector))))
440 ;;;-------------------------------------------------------------------------
441 ;;; POSITION
442 ;;;-------------------------------------------------------------------------
444 (defmethod zstream-position ((stream single-channel-zstream) &key direction)
445 (declare (ignore direction))
446 (with-synchronized-device-zstream (stream :input)
447 (let ((position (device-position (zstream-device stream))))
448 ;; FIXME: signal proper condition
449 (assert (not (null position)) (position)
450 "A single-channel-zstream's device must not return a NULL device-position.")
451 (if (%sczs-dirtyp stream)
452 (+ position (iobuf-available-octets (%db-output-iobuf stream)))
453 (- position (iobuf-available-octets (%db-input-iobuf stream)))))))
455 (defmethod zstream-position ((stream dual-channel-zstream) &key direction)
456 (declare (ignore direction))
457 (with-synchronized-device-zstream (stream :io)
458 (device-position (zstream-device stream))))
460 (defmethod zstream-position ((stream memory-zstream) &key direction)
461 (ecase direction
462 (:input (%mb-input-position stream))
463 (:output (%mb-output-position stream))))
466 ;;;-------------------------------------------------------------------------
467 ;;; (SETF POSITION)
468 ;;;-------------------------------------------------------------------------
470 (defmethod (setf zstream-position)
471 (position (stream device-zstream) &key direction (from :start))
472 (declare (ignore direction))
473 (with-synchronized-device-zstream (stream :input)
474 (setf (%db-position stream from) position)))
476 (defun (setf %db-position) (position stream from)
477 (setf (device-position (zstream-device stream) from) position))
479 (defmethod (setf zstream-position)
480 (offset (stream memory-zstream) &key direction (from :start))
481 (with-accessors ((data-vector %mb-data-vector)
482 (input-position %mb-input-position)
483 (output-position %mb-output-position))
484 stream
485 (ecase direction
486 (:input
487 (let ((newpos
488 (ecase from
489 (:start offset)
490 (:current (+ input-position offset))
491 (:output (+ output-position offset)))))
492 (check-bounds data-vector newpos output-position)
493 (setf input-position newpos)))
494 (:output
495 (let ((newpos
496 (ecase from
497 (:start offset)
498 (:current (+ output-position offset))
499 (:input (+ input-position offset)))))
500 (%ensure-buffer-capacity stream (- newpos output-position))
501 (setf output-position newpos))))))
504 ;;;-------------------------------------------------------------------------
505 ;;; CLEAR-INPUT
506 ;;;-------------------------------------------------------------------------
508 (defmethod zstream-clear-input ((stream device-zstream))
509 (with-synchronized-device-zstream (stream :input)
510 (%zstream-clear-input stream)))
512 (defmethod %zstream-clear-input ((stream single-channel-zstream))
513 (unless (%sczs-dirtyp stream)
514 (let ((nbytes (iobuf-available-octets (%db-input-iobuf stream))))
515 (unless (zerop nbytes)
516 (setf (%db-position stream :current) (- nbytes)))
517 (iobuf-reset (%db-input-iobuf stream)))))
519 (defmethod %zstream-clear-input ((stream dual-channel-zstream))
520 (iobuf-reset (%db-input-iobuf stream)))
522 (defmethod zstream-clear-input ((stream memory-zstream))
523 (setf (%mb-input-position stream) (%mb-output-position stream)))
526 ;;;-------------------------------------------------------------------------
527 ;;; CLEAR-OUTPUT
528 ;;;-------------------------------------------------------------------------
530 (defmethod zstream-clear-output ((stream device-zstream))
531 (with-synchronized-device-zstream (stream :output)
532 (%zstream-clear-output stream)))
534 (defmethod %zstream-clear-output ((stream single-channel-zstream))
535 (when (%sczs-dirtyp stream)
536 (iobuf-reset (%db-output-iobuf stream))))
538 (defmethod %zstream-clear-output ((stream dual-channel-zstream))
539 (iobuf-reset (%db-output-iobuf stream)))
541 (defmethod zstream-clear-output ((stream memory-zstream))
542 (setf (%mb-output-position stream) (%mb-input-position stream)))
545 ;;;-------------------------------------------------------------------------
546 ;;; FILL-INPUT
547 ;;;-------------------------------------------------------------------------
549 (defmethod zstream-fill ((stream single-channel-zstream) &key timeout)
550 (with-synchronized-device-zstream (stream :input)
551 (%zstream-flush stream timeout)
552 (%zstream-fill stream timeout)))
554 (defmethod zstream-fill ((stream dual-channel-zstream) &key timeout)
555 (with-synchronized-device-zstream (stream :input)
556 (%zstream-fill stream timeout)))
558 (defmethod %zstream-fill ((stream device-zstream) timeout)
559 (with-accessors ((device zstream-device)
560 (input-iobuf %db-input-iobuf))
561 stream
562 (multiple-value-bind (data start end)
563 (iobuf-next-empty-zone input-iobuf)
564 (let ((nbytes
565 (device-read device data :start start
566 :end end :timeout timeout)))
567 (etypecase nbytes
568 ((eql :eof)
569 (error 'end-of-file :stream stream))
570 (unsigned-byte
571 (setf (iobuf-end input-iobuf) (+ start nbytes))
572 (values nbytes (iobuf-available-space input-iobuf))))))))
574 (defmethod zstream-fill ((stream memory-zstream) &key timeout)
575 (declare (ignore stream timeout))
576 (values nil))
579 ;;;-------------------------------------------------------------------------
580 ;;; FLUSH-OUTPUT
581 ;;;-------------------------------------------------------------------------
583 (defmethod zstream-flush ((stream device-zstream) &key timeout)
584 (with-synchronized-device-zstream (stream :output)
585 (%zstream-flush stream timeout)))
587 (defmethod %zstream-flush ((stream device-zstream) timeout)
588 (with-accessors ((device zstream-device)
589 (output-iobuf %db-output-iobuf))
590 stream
591 (when (%sczs-dirtyp stream)
592 (multiple-value-bind (data start end)
593 (iobuf-next-data-zone output-iobuf)
594 (let ((nbytes
595 (device-write device data :start start
596 :end end :timeout timeout)))
597 (etypecase nbytes
598 ((eql :hangup)
599 (error 'hangup :stream stream))
600 (unsigned-byte
601 (setf (iobuf-start output-iobuf) (+ start nbytes))
602 (values nbytes (iobuf-available-octets output-iobuf)))))))))
604 (defmethod %zstream-flush :after ((stream single-channel-zstream) timeout)
605 (declare (ignore timeout))
606 (when (iobuf-empty-p (%db-output-iobuf stream))
607 (setf (%sczs-dirtyp stream) nil)))
609 (defmethod zstream-flush ((stream memory-zstream) &key timeout)
610 (declare (ignore stream timeout))
611 (values nil))
614 ;;;-------------------------------------------------------------------------
615 ;;; MEMORY-ZSTREAM GROW
616 ;;;-------------------------------------------------------------------------
618 (defmethod %ensure-buffer-capacity
619 ((stream memory-zstream) &optional (amount 1))
620 (check-type amount unsigned-byte)
621 (with-accessors ((data-vector %mb-data-vector)
622 (output-position %mb-output-position)
623 (adjust-size %mb-adjust-size)
624 (adjust-threshold %mb-adjust-threshold))
625 stream
626 (let* ((size-needed (+ output-position amount))
627 (threshold (ceiling (* adjust-threshold size-needed))))
628 (when (> threshold (length data-vector))
629 (setf data-vector
630 (adjust-array data-vector
631 (truncate (* adjust-size size-needed))))))))
633 (defmethod %check-buffer-available-data
634 ((stream memory-zstream) &optional (amount 1))
635 (check-type amount positive-integer)
636 (with-accessors ((input-position %mb-input-position)
637 (output-position %mb-output-position))
638 stream
639 (let ((available-data (- output-position input-position)))
640 (check-type available-data unsigned-byte)
641 (cond
642 ((zerop available-data)
643 (error 'end-of-file :stream stream))
644 ((< available-data amount)
645 ;; FIXME: signal proper condition, soft EOF
646 (error "~S elements requested, only ~S available"
647 amount available-data))))))
650 ;;;-------------------------------------------------------------------------
651 ;;; I/O WAIT
652 ;;;-------------------------------------------------------------------------
654 (defmethod zstream-poll ((stream device-zstream) &key direction timeout)
655 (device-poll (zstream-device stream) direction timeout))
657 (defmethod zstream-poll ((stream memory-zstream) &key direction timeout)
658 (declare (ignore timeout))
659 (ecase direction
660 (:input (< (%mb-input-position stream)
661 (%mb-output-position stream)))
662 (:output t)))