Add %SYS-KILL.
[iolib.git] / io.streams / zeta / stream.lisp
blob5c66786169b101bd2941b797aef05bff44642e53
1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; indent-tabs-mode: nil -*-
2 ;;;
3 ;;; --- Zeta Streams.
4 ;;;
6 (in-package :io.zeta-streams)
8 ;;;-------------------------------------------------------------------------
9 ;;; Classes and Types
10 ;;;-------------------------------------------------------------------------
12 (defclass buffer ()
13 ())
15 (defclass device-buffer (buffer)
16 ((synchronized :initarg :synchronized)
17 (device :initarg :device)
18 (input-iobuf :initarg :input-buffer)
19 (output-iobuf :initarg :output-buffer)
20 (buffering :initarg :buffering))
21 (:default-initargs :synchronized nil))
23 (defclass memory-buffer (buffer)
24 ((data-vector :initform nil)
25 (element-type :initarg :element-type)
26 (input-position :initform 0)
27 (output-position :initform 0)
28 (adjust-size :initarg :adjust-size)
29 (adjust-threshold :initarg :adjust-threshold))
30 (:default-initargs :element-type 't
31 :adjust-size 1.5
32 :adjust-threshold 1))
34 (defclass zstream ()
35 (external-format))
37 (defclass device-zstream (device-buffer zstream)
38 ())
40 (defclass single-channel-zstream (device-zstream)
41 ((dirtyp :initform nil)))
43 (defclass dual-channel-zstream (device-zstream)
44 ())
46 (defclass memory-zstream (memory-buffer zstream)
47 ())
49 (defclass octet-memory-zstream (memory-zstream)
51 (:default-initargs :element-type 'octet))
53 (defclass character-memory-zstream (memory-zstream)
55 (:default-initargs :element-type 'character))
58 ;;;-------------------------------------------------------------------------
59 ;;; Generic Functions
60 ;;;-------------------------------------------------------------------------
62 ;;; Accessors
64 (defgeneric zstream-synchronized-p (stream))
66 (defgeneric zstream-dirtyp (stream))
68 (defgeneric (setf zstream-dirtyp) (value stream))
70 (defgeneric zstream-device (stream))
72 (defgeneric (setf zstream-device) (new-device stream))
74 (defgeneric zstream-external-format (stream))
76 (defgeneric (setf zstream-external-format) (external-format stream))
78 (defgeneric zstream-element-type (stream))
80 ;;; I/O functions
82 (defgeneric zstream-read-element (stream &key timeout))
84 (defgeneric zstream-write-element (stream element &key timeout))
86 (defgeneric zstream-read-vector (stream vector &key start end timeout))
88 (defgeneric zstream-write-vector (stream vector &key start end timeout))
90 (defgeneric zstream-read-byte (stream &key width signed))
92 (defgeneric zstream-write-byte (stream byte &key width signed))
94 (defgeneric zstream-read-char (stream &key eof-error-p eof-value))
96 (defgeneric zstream-write-char (stream char &key hangup-error-p hangup-value))
98 (defgeneric zstream-read-line (stream &key eof-error-p eof-value))
100 (defgeneric zstream-write-line (stream line &key start end hangup-error-p hangup-value))
102 ;;; Device zstream functions
104 (defgeneric zstream-position (stream &key direction))
106 (defgeneric (setf zstream-position) (position stream &key direction from))
108 (defgeneric zstream-poll (stream &key direction timeout))
110 (defgeneric zstream-fill (stream &key timeout))
112 (defgeneric zstream-flush (stream &key timeout))
114 (defgeneric zstream-clear-input (stream))
116 (defgeneric zstream-clear-output (stream))
118 ;;; Internal functions
120 (defgeneric %zstream-read-vector (stream vector start end timeout))
122 (defgeneric %zstream-write-vector (stream vector start end timeout))
124 (defgeneric %zstream-fill (stream timeout))
126 (defgeneric %zstream-flush (stream timeout))
128 (defgeneric %zstream-clear-input (stream))
130 (defgeneric %zstream-clear-output (stream))
132 ;; FIXME: choose better name
133 (defgeneric %ensure-buffer-capacity (stream &optional amount))
135 ;; FIXME: choose better name
136 (defgeneric %check-buffer-available-data (stream &optional amount))
139 ;;;-------------------------------------------------------------------------
140 ;;; Accessors
141 ;;;-------------------------------------------------------------------------
143 (defmethod zstream-synchronized-p ((stream device-zstream))
144 (slot-value stream 'synchronized))
146 (defmethod zstream-synchronized-p ((stream memory-zstream))
147 (declare (ignore stream))
148 (values nil))
150 (defmethod zstream-dirtyp ((stream single-channel-zstream))
151 (slot-value stream 'dirtyp))
153 (defmethod zstream-dirtyp ((stream dual-channel-zstream))
154 (plusp (iobuf-available-octets (slot-value stream 'output-iobuf))))
156 (defmethod (setf zstream-dirtyp) (value (stream dual-channel-zstream))
157 (values nil))
159 (defmethod zstream-device ((stream device-zstream))
160 (slot-value stream 'device))
162 (defmethod zstream-device ((stream memory-zstream))
163 (declare (ignore stream))
164 (values nil))
166 (defmethod (setf zstream-device) (new-device (stream device-zstream))
167 (setf (slot-value stream 'device) new-device))
169 (defmethod (setf zstream-device) (new-device (stream memory-zstream))
170 (declare (ignore new-device stream))
171 (values nil))
173 (defmethod zstream-external-format ((stream zstream))
174 (slot-value stream 'external-format))
176 (defmethod (setf zstream-external-format)
177 (external-format (stream zstream))
178 (setf (slot-value stream 'external-format)
179 (babel:ensure-external-format external-format)))
181 (defmethod zstream-element-type ((stream device-zstream))
182 '(unsigned-byte 8))
184 (defmethod zstream-element-type ((stream memory-zstream))
185 (slot-value stream 'element-type))
188 ;;;-------------------------------------------------------------------------
189 ;;; Constructors
190 ;;;-------------------------------------------------------------------------
192 (defmethod shared-initialize :after
193 ((stream single-channel-zstream) slot-names
194 &key data size buffering)
195 (declare (ignore slot-names))
196 (with-slots (device input-iobuf output-iobuf)
197 stream
198 (check-type device device)
199 (check-type data (or null iobuf))
200 (check-type buffering stream-buffering)
201 (setf input-iobuf (or data (make-iobuf size))
202 output-iobuf input-iobuf)))
204 (defmethod shared-initialize :after
205 ((stream dual-channel-zstream) slot-names
206 &key input-data output-data input-size output-size buffering)
207 (declare (ignore slot-names))
208 (with-slots (device input-iobuf output-iobuf)
209 stream
210 (check-type device device)
211 (check-type input-data (or null iobuf))
212 (check-type output-data (or null iobuf))
213 (check-type buffering stream-buffering)
214 (setf input-iobuf (or input-data (make-iobuf input-size)))
215 (setf output-iobuf (or output-data (make-iobuf output-size)))))
217 (defmethod shared-initialize :after
218 ((stream memory-zstream) slot-names &key data (start 0) end)
219 (declare (ignore slot-names))
220 (with-slots (data-vector input-position output-position
221 element-type adjust-size adjust-threshold)
222 stream
223 (check-type adjust-size (real 1.001))
224 (check-type adjust-threshold (real 0.1 1))
225 (setf element-type (upgraded-array-element-type element-type))
226 (cond
227 (data
228 (check-bounds data start end)
229 (when element-type
230 ;; FIXME: signal proper condition
231 (assert (subtypep element-type (array-element-type data))))
232 (setf data-vector
233 (make-array (truncate (* adjust-size (length data)))
234 :element-type (or element-type
235 (array-element-type data))))
236 (setf output-position (- end start))
237 (replace data-vector data :start2 start :end2 end))
239 (setf data-vector (make-array 128 :element-type element-type))))))
241 (defmethod shared-initialize :after ((stream zstream) slot-names
242 &key (external-format :default))
243 (declare (ignore slot-names))
244 (setf (zstream-external-format stream) external-format))
246 (defun make-memory-zstream (&key data (start 0) end (element-type t)
247 (adjust-size 1.5) (adjust-threshold 1)
248 (external-format :default))
249 (let ((element-type (upgraded-array-element-type element-type)))
250 (cond
251 ((subtypep element-type 'octet)
252 (make-instance 'octet-memory-zstream
253 :data data :start start :end end
254 :adjust-size adjust-size
255 :adjust-threshold adjust-threshold
256 :external-format external-format))
257 ((subtypep element-type 'character)
258 (make-instance 'character-memory-zstream
259 :data data :start start :end end
260 :adjust-size adjust-size
261 :adjust-threshold adjust-threshold))
262 ((subtypep element-type 't)
263 (make-instance 'memory-zstream
264 :data data :start start :end end
265 :element-type element-type
266 :adjust-size adjust-size
267 :adjust-threshold adjust-threshold))
269 (error 'subtype-error :datum element-type
270 :expected-supertype '(or (unsigned-byte 8) character t))))))
273 ;;;-------------------------------------------------------------------------
274 ;;; Helper macros
275 ;;;-------------------------------------------------------------------------
277 ;; FIXME: synchronize memory streams too ?
278 (defmacro with-synchronized-device-zstream
279 ((stream &optional direction) &body body)
280 (with-gensyms (body-fun)
281 (labels ((make-locks (body direction)
282 (ecase direction
283 (:input
284 `(bt:with-lock-held
285 ((iobuf-lock (slot-value ,stream 'input-iobuf)))
286 ,body))
287 (:output
288 `(bt:with-lock-held
289 ((iobuf-lock (slot-value ,stream 'output-iobuf)))
290 ,body))
291 (:io
292 (make-locks (make-locks body :output) :input)))))
293 `(flet ((,body-fun () ,@body))
294 (declare (dynamic-extent #',body-fun))
295 (if (zstream-synchronized-p ,stream)
296 ,(make-locks `(,body-fun) direction)
297 (,body-fun))))))
300 ;;;-------------------------------------------------------------------------
301 ;;; RELINQUISH
302 ;;;-------------------------------------------------------------------------
304 (defmethod relinquish :after ((stream single-channel-zstream) &key abort)
305 (with-synchronized-device-zstream (stream :input)
306 (unless abort
307 (%zstream-flush stream 0))
308 (relinquish (zstream-device stream) :abort abort))
309 (values stream))
311 (defmethod relinquish :after ((stream dual-channel-zstream) &key abort)
312 (with-synchronized-device-zstream (stream :io)
313 (unless abort
314 (%zstream-flush stream 0))
315 (relinquish (zstream-device stream) :abort abort))
316 (values stream))
319 ;;;-------------------------------------------------------------------------
320 ;;; READ-ELEMENT
321 ;;;-------------------------------------------------------------------------
323 (defmethod zstream-read-element ((stream device-zstream) &key timeout)
324 (let ((v (make-array 1 :element-type 'octet)))
325 (declare (dynamic-extent v))
326 (zstream-read-vector stream v :timeout timeout)
327 (aref v 0)))
329 (defmethod zstream-read-element ((stream memory-zstream) &key timeout)
330 (declare (ignore timeout))
331 (let ((v (make-array 1 :element-type (slot-value stream 'element-type))))
332 (declare (dynamic-extent v))
333 (zstream-read-vector stream v)
334 (aref v 0)))
337 ;;;-------------------------------------------------------------------------
338 ;;; READ-VECTOR
339 ;;;-------------------------------------------------------------------------
341 (defmethod zstream-read-vector :around ((stream zstream) vector &key
342 (start 0) end timeout)
343 (check-bounds vector start end)
344 (when (= start end) (return* 0))
345 (call-next-method stream vector :start start :end end :timeout timeout))
347 (defmethod zstream-read-vector ((stream single-channel-zstream) vector
348 &key start end timeout)
349 (with-synchronized-device-zstream (stream :input)
350 (%zstream-read-vector stream vector start end timeout)))
352 (defmethod zstream-read-vector ((stream dual-channel-zstream) vector
353 &key start end timeout)
354 (with-synchronized-device-zstream (stream :input)
355 (%zstream-read-vector stream vector start end timeout)))
357 (defmethod %zstream-read-vector ((stream device-zstream) vector
358 start end timeout)
359 (with-slots (input-iobuf)
360 stream
361 (cond
362 ((iobuf-empty-p input-iobuf)
363 (let ((nbytes (%zstream-fill stream timeout)))
364 (if (iobuf-empty-p input-iobuf)
365 (if (eql :eof nbytes) :eof 0)
366 (iobuf->vector input-iobuf vector start end))))
368 (iobuf->vector input-iobuf vector start end)))))
370 (defmethod zstream-read-vector ((stream memory-zstream) vector
371 &key start end timeout)
372 (declare (ignore timeout))
373 (with-slots (data-vector input-position output-position)
374 stream
375 (%check-buffer-available-data stream 1)
376 (replace vector data-vector
377 :start1 input-position :end1 output-position
378 :start2 start :end2 end)
379 (incf input-position (min (- output-position input-position)
380 (- end start)))))
383 ;;;-------------------------------------------------------------------------
384 ;;; WRITE-ELEMENT
385 ;;;-------------------------------------------------------------------------
387 (defmethod zstream-write-element ((stream device-zstream) octet &key timeout)
388 (check-type octet octet)
389 (let ((v (make-array 1 :element-type 'octet :initial-contents octet)))
390 (declare (dynamic-extent v))
391 (zstream-write-vector stream v :timeout timeout)))
393 (defmethod zstream-write-element ((stream memory-zstream) element &key timeout)
394 (declare (ignore timeout))
395 (let ((v (make-array 1 :element-type (slot-value stream 'element-type)
396 :initial-contents element)))
397 (declare (dynamic-extent v))
398 (zstream-write-vector stream v)))
401 ;;;-------------------------------------------------------------------------
402 ;;; WRITE-VECTOR
403 ;;;-------------------------------------------------------------------------
405 (defmethod zstream-write-vector :around ((stream zstream) vector
406 &key (start 0) end timeout)
407 (check-bounds vector start end)
408 (when (= start end) (return* 0))
409 (call-next-method stream vector :start start :end end :timeout timeout))
411 (defmethod zstream-write-vector ((stream single-channel-zstream) vector
412 &key start end timeout)
413 (with-synchronized-device-zstream (stream :output)
414 ;; If the previous operation was a read, flush the read buffer
415 ;; and reposition the file offset accordingly
416 (%zstream-clear-input stream)
417 (%zstream-write-vector stream vector start end timeout)))
419 (defmethod zstream-write-vector ((stream dual-channel-zstream) vector
420 &key start end timeout)
421 (with-synchronized-device-zstream (stream :output)
422 (%zstream-write-vector stream vector start end timeout)))
424 (defmethod %zstream-write-vector ((stream device-zstream) vector start end timeout)
425 (with-slots (output-iobuf)
426 stream
427 (multiple-value-prog1
428 (vector->iobuf output-iobuf vector start end)
429 (when (iobuf-full-p output-iobuf)
430 (%zstream-flush stream timeout)))))
432 (defmethod %zstream-write-vector :after ((stream single-channel-zstream)
433 vector start end timeout)
434 (declare (ignore vector start end timeout))
435 (setf (slot-value stream 'dirtyp) t))
437 (defmethod zstream-write-vector ((stream memory-zstream) vector
438 &key (start 0) end timeout)
439 (declare (ignore timeout))
440 (with-slots (data-vector output-position)
441 stream
442 (%ensure-buffer-capacity stream (length vector))
443 (replace data-vector vector :start1 output-position
444 :start2 start :end2 end)
445 (incf output-position (length vector))))
448 ;;;-------------------------------------------------------------------------
449 ;;; POSITION
450 ;;;-------------------------------------------------------------------------
452 (defmethod zstream-position ((stream single-channel-zstream) &key direction)
453 (declare (ignore direction))
454 (with-slots (input-iobuf output-iobuf dirtyp)
455 stream
456 (with-synchronized-device-zstream (stream :input)
457 (let ((position (device-position (zstream-device stream))))
458 ;; FIXME: signal proper condition
459 (assert (not (null position)) (position)
460 "A single-channel-zstream's device must not return a NULL device-position.")
461 (if dirtyp
462 (+ position (iobuf-available-octets output-iobuf))
463 (- position (iobuf-available-octets input-iobuf)))))))
465 (defmethod zstream-position ((stream dual-channel-zstream) &key direction)
466 (declare (ignore direction))
467 (with-synchronized-device-zstream (stream :io)
468 (device-position (zstream-device stream))))
470 (defmethod zstream-position ((stream memory-zstream) &key direction)
471 (ecase direction
472 (:input (slot-value stream 'input-position))
473 (:output (slot-value stream 'output-position))))
476 ;;;-------------------------------------------------------------------------
477 ;;; (SETF POSITION)
478 ;;;-------------------------------------------------------------------------
480 (defmethod (setf zstream-position)
481 (position (stream device-zstream) &key direction (from :start))
482 (declare (ignore direction))
483 (with-synchronized-device-zstream (stream :input)
484 (setf (device-zstream-position stream from) position)))
486 (defun (setf device-zstream-position) (position stream from)
487 (setf (device-position (zstream-device stream) from) position))
489 (defmethod (setf zstream-position)
490 (offset (stream memory-zstream) &key direction (from :start))
491 (with-slots (data-vector input-position output-position)
492 stream
493 (ecase direction
494 (:input
495 (let ((newpos
496 (ecase from
497 (:start offset)
498 (:current (+ input-position offset))
499 (:output (+ output-position offset)))))
500 (check-bounds data-vector newpos output-position)
501 (setf input-position newpos)))
502 (:output
503 (let ((newpos
504 (ecase from
505 (:start offset)
506 (:current (+ output-position offset))
507 (:input (+ input-position offset)))))
508 (%ensure-buffer-capacity stream (- newpos output-position))
509 (setf output-position newpos))))))
512 ;;;-------------------------------------------------------------------------
513 ;;; CLEAR-INPUT
514 ;;;-------------------------------------------------------------------------
516 (defmethod zstream-clear-input ((stream device-zstream))
517 (with-synchronized-device-zstream (stream :input)
518 (%zstream-clear-input stream)))
520 (defmethod %zstream-clear-input ((stream single-channel-zstream))
521 (with-slots (input-iobuf dirtyp)
522 stream
523 (unless dirtyp
524 (let ((nbytes (iobuf-available-octets input-iobuf)))
525 (unless (zerop nbytes)
526 (setf (device-zstream-position stream :current) (- nbytes)))
527 (iobuf-reset input-iobuf)))))
529 (defmethod %zstream-clear-input ((stream dual-channel-zstream))
530 (iobuf-reset (slot-value stream 'input-iobuf)))
532 (defmethod zstream-clear-input ((stream memory-zstream))
533 (setf (slot-value stream 'input-position)
534 (slot-value stream 'output-position)))
537 ;;;-------------------------------------------------------------------------
538 ;;; CLEAR-OUTPUT
539 ;;;-------------------------------------------------------------------------
541 (defmethod zstream-clear-output ((stream device-zstream))
542 (with-synchronized-device-zstream (stream :output)
543 (%zstream-clear-output stream)))
545 (defmethod %zstream-clear-output ((stream single-channel-zstream))
546 (with-slots (output-iobuf dirtyp)
547 stream
548 (when dirtyp
549 (iobuf-reset output-iobuf))))
551 (defmethod %zstream-clear-output ((stream dual-channel-zstream))
552 (iobuf-reset (slot-value stream 'output-iobuf)))
554 (defmethod zstream-clear-output ((stream memory-zstream))
555 (setf (slot-value stream 'output-position)
556 (slot-value stream 'input-position)))
559 ;;;-------------------------------------------------------------------------
560 ;;; FILL-INPUT
561 ;;;-------------------------------------------------------------------------
563 (defmethod zstream-fill ((stream single-channel-zstream) &key timeout)
564 (with-synchronized-device-zstream (stream :input)
565 (%zstream-flush stream timeout)
566 (%zstream-fill stream timeout)))
568 (defmethod zstream-fill ((stream dual-channel-zstream) &key timeout)
569 (with-synchronized-device-zstream (stream :input)
570 (%zstream-fill stream timeout)))
572 (defmethod %zstream-fill ((stream device-zstream) timeout)
573 (with-slots (device input-iobuf)
574 stream
575 (multiple-value-bind (data start end)
576 (iobuf-next-empty-zone input-iobuf)
577 (let ((nbytes
578 (device-read device data :start start
579 :end end :timeout timeout)))
580 (etypecase nbytes
581 ((eql :eof)
582 (error 'end-of-file :stream stream))
583 (unsigned-byte
584 (setf (iobuf-end input-iobuf) (+ start nbytes))
585 (values nbytes (iobuf-available-space input-iobuf))))))))
587 (defmethod zstream-fill ((stream memory-zstream) &key timeout)
588 (declare (ignore stream timeout))
589 (values nil))
592 ;;;-------------------------------------------------------------------------
593 ;;; FLUSH-OUTPUT
594 ;;;-------------------------------------------------------------------------
596 (defmethod zstream-flush ((stream device-zstream) &key timeout)
597 (with-synchronized-device-zstream (stream :output)
598 (%zstream-flush stream timeout)))
600 (defmethod %zstream-flush ((stream device-zstream) timeout)
601 (with-slots (device output-iobuf dirtyp)
602 stream
603 (when dirtyp
604 (multiple-value-bind (data start end)
605 (iobuf-next-data-zone output-iobuf)
606 (let ((nbytes
607 (device-write device data :start start
608 :end end :timeout timeout)))
609 (etypecase nbytes
610 ((eql :hangup)
611 (error 'hangup :stream stream))
612 (unsigned-byte
613 (setf (iobuf-start output-iobuf) (+ start nbytes))
614 (values nbytes (iobuf-available-octets output-iobuf)))))))))
616 (defmethod %zstream-flush :after ((stream single-channel-zstream) timeout)
617 (declare (ignore timeout))
618 (with-slots (output-iobuf dirtyp)
619 stream
620 (when (iobuf-empty-p output-iobuf)
621 (setf dirtyp nil))))
623 (defmethod zstream-flush ((stream memory-zstream) &key timeout)
624 (declare (ignore stream timeout))
625 (values nil))
628 ;;;-------------------------------------------------------------------------
629 ;;; MEMORY-ZSTREAM GROW
630 ;;;-------------------------------------------------------------------------
632 (defmethod %ensure-buffer-capacity
633 ((stream memory-zstream) &optional (amount 1))
634 (check-type amount unsigned-byte)
635 (with-slots (data-vector output-position adjust-size adjust-threshold)
636 stream
637 (let* ((size-needed (+ output-position amount))
638 (threshold (ceiling (* adjust-threshold size-needed))))
639 (when (> threshold (length data-vector))
640 (setf data-vector
641 (adjust-array data-vector
642 (truncate (* adjust-size size-needed))))))))
644 (defmethod %check-buffer-available-data
645 ((stream memory-zstream) &optional (amount 1))
646 (check-type amount positive-integer)
647 (with-slots (input-position output-position)
648 stream
649 (let ((available-data (- output-position input-position)))
650 (check-type available-data unsigned-byte)
651 (cond
652 ((zerop available-data)
653 (error 'end-of-file :stream stream))
654 ((< available-data amount)
655 ;; FIXME: signal proper condition, soft EOF
656 (error "~S elements requested, only ~S available"
657 amount available-data))))))
660 ;;;-------------------------------------------------------------------------
661 ;;; I/O WAIT
662 ;;;-------------------------------------------------------------------------
664 (defmethod zstream-poll ((stream device-zstream) &key direction timeout)
665 (device-poll (zstream-device stream) direction timeout))
667 (defmethod zstream-poll ((stream memory-zstream) &key direction timeout)
668 (declare (ignore timeout))
669 (with-slots (input-position output-position)
670 stream
671 (ecase direction
672 (:input (< input-position output-position))
673 (:output t))))