tee_input: allow using Revactor::TCP::Socket objects
[unicorn.git] / lib / unicorn / tee_input.rb
blobaa1dff348d6f7300a84195c4446457ab5af3bae7
1 # Copyright (c) 2009 Eric Wong
2 # You can redistribute it and/or modify it under the same terms as Ruby.
4 # acts like tee(1) on an input input to provide a input-like stream
5 # while providing rewindable semantics through a File/StringIO
6 # backing store.  On the first pass, the input is only read on demand
7 # so your Rack application can use input notification (upload progress
8 # and like).  This should fully conform to the Rack::InputWrapper
9 # specification on the public API.  This class is intended to be a
10 # strict interpretation of Rack::InputWrapper functionality and will
11 # not support any deviations from it.
13 module Unicorn
14   class TeeInput < Struct.new(:socket, :req, :parser, :buf)
16     def initialize(*args)
17       super(*args)
18       @size = parser.content_length
19       @tmp = @size && @size < Const::MAX_BODY ? StringIO.new(Z.dup) : Util.tmpio
20       @buf2 = buf.dup
21       if buf.size > 0
22         parser.filter_body(@buf2, buf) and finalize_input
23         @tmp.write(@buf2)
24         @tmp.seek(0)
25       end
27       # give our socket object a readpartial if it can't handle it
28       if socket && ! socket.respond_to?(:readpartial)
29         def socket.readpartial(nr, buf = Unicorn::Z.dup)
30           buf.replace(read)
31         end
32       end
33     end
35     # returns the size of the input.  This is what the Content-Length
36     # header value should be, and how large our input is expected to be.
37     # For TE:chunked, this requires consuming all of the input stream
38     # before returning since there's no other way
39     def size
40       @size and return @size
42       if socket
43         pos = @tmp.pos
44         while tee(Const::CHUNK_SIZE, @buf2)
45         end
46         @tmp.seek(pos)
47       end
49       @size = tmp_size
50     end
52     def read(*args)
53       socket or return @tmp.read(*args)
55       length = args.shift
56       if nil == length
57         rv = @tmp.read || Z.dup
58         while tee(Const::CHUNK_SIZE, @buf2)
59           rv << @buf2
60         end
61         rv
62       else
63         rv = args.shift || @buf2.dup
64         diff = tmp_size - @tmp.pos
65         if 0 == diff
66           tee(length, rv)
67         else
68           @tmp.read(diff > length ? length : diff, rv)
69         end
70       end
71     end
73     # takes zero arguments for strict Rack::Lint compatibility, unlike IO#gets
74     def gets
75       socket or return @tmp.gets
76       nil == $/ and return read
78       orig_size = tmp_size
79       if @tmp.pos == orig_size
80         tee(Const::CHUNK_SIZE, @buf2) or return nil
81         @tmp.seek(orig_size)
82       end
84       line = @tmp.gets # cannot be nil here since size > pos
85       $/ == line[-$/.size, $/.size] and return line
87       # unlikely, if we got here, then @tmp is at EOF
88       begin
89         orig_size = @tmp.pos
90         tee(Const::CHUNK_SIZE, @buf2) or break
91         @tmp.seek(orig_size)
92         line << @tmp.gets
93         $/ == line[-$/.size, $/.size] and return line
94         # @tmp is at EOF again here, retry the loop
95       end while true
97       line
98     end
100     def each(&block)
101       while line = gets
102         yield line
103       end
105       self # Rack does not specify what the return value is here
106     end
108     def rewind
109       @tmp.rewind # Rack does not specify what the return value is here
110     end
112   private
114     # tees off a +length+ chunk of data from the input into the IO
115     # backing store as well as returning it.  +buf+ must be specified.
116     # returns nil if reading from the input returns nil
117     def tee(length, dst)
118       unless parser.body_eof?
119         begin
120           if parser.filter_body(dst, socket.readpartial(length, buf)).nil?
121             @tmp.write(dst)
122             return dst
123           end
124         rescue EOFError
125         end
126       end
127       finalize_input
128     end
130     def finalize_input
131       while parser.trailers(req, buf).nil?
132         buf << socket.readpartial(Const::CHUNK_SIZE, @buf2)
133       end
134       self.socket = nil
135     end
137     def tmp_size
138       StringIO === @tmp ? @tmp.size : @tmp.stat.size
139     end
141   end