1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
5 // Pipe adapter to connect code expecting an io.Reader
6 // with code expecting an io.Writer.
16 type pipeResult
struct {
21 // Shared pipe structure.
23 // Reader sends on cr1, receives on cr2.
24 // Writer does the same on cw1, cw2.
26 r2
, w2
chan pipeResult
28 rclose
chan os
.Error
// read close; error to return to writers
29 wclose
chan os
.Error
// write close; error to return to readers
31 done
chan int // read or write half is done
34 func (p
*pipe
) run() {
36 rb
[]byte // pending Read
37 wb
[]byte // pending Write
38 wn
int // amount written so far from wb
39 rerr os
.Error
// if read end is closed, error to send to writers
40 werr os
.Error
// if write end is closed, error to send to readers
41 r1
chan []byte // p.cr1 or nil depending on whether Read is ok
42 w1
chan []byte // p.cw1 or nil depending on whether Write is ok
46 // Read and Write are enabled at the start.
53 if ndone
++; ndone
== 2 {
54 // both reader and writer are gone
55 // close out any existing i/o
57 p
.r2
<- pipeResult
{0, os
.EINVAL
}
60 p
.w2
<- pipeResult
{0, os
.EINVAL
}
65 case rerr
= <-p
.rclose
:
67 // finish pending Write
68 p
.w2
<- pipeResult
{wn
, rerr
}
70 w1
= p
.w1
// allow another Write
73 // Close of read side during Read.
74 // finish pending Read with os.EINVAL.
75 p
.r2
<- pipeResult
{0, os
.EINVAL
}
76 r1
= p
.r1
// allow another Read
79 case werr
= <-p
.wclose
:
81 // finish pending Read
82 p
.r2
<- pipeResult
{0, werr
}
83 r1
= p
.r1
// allow another Read
86 // Close of write side during Write.
87 // finish pending Write with os.EINVAL.
88 p
.w2
<- pipeResult
{wn
, os
.EINVAL
}
90 w1
= p
.w1
// allow another Write
95 // write end is closed
96 p
.r2
<- pipeResult
{0, werr
}
100 // read end is closed
101 p
.r2
<- pipeResult
{0, os
.EINVAL
}
104 r1
= nil // disable Read until this one is done
107 // read end is closed
108 p
.w2
<- pipeResult
{0, rerr
}
112 // write end is closed
113 p
.w2
<- pipeResult
{0, os
.EINVAL
}
116 w1
= nil // disable Write until this one is done
119 if r1
== nil && w1
== nil {
120 // Have rb and wb. Execute.
126 p
.r2
<- pipeResult
{n
, nil}
127 r1
= p
.r1
// allow another Read
129 // Maybe finish Write.
131 p
.w2
<- pipeResult
{wn
, nil}
133 w1
= p
.w1
// allow another Write
139 // Read/write halves of the pipe.
140 // They are separate structures for two reasons:
141 // 1. If one end becomes garbage without being Closed,
142 // its finalizer can Close so that the other end
143 // does not hang indefinitely.
144 // 2. Clients cannot use interface conversions on the
145 // read end to find the Write method, and vice versa.
147 type pipeHalf
struct {
160 func (p
*pipeHalf
) rw(data
[]byte) (n
int, err os
.Error
) {
161 // Run i/o operation.
162 // Check ioclosed flag under lock to make sure we're still allowed to do i/o.
171 return res
.n
, res
.err
174 func (p
*pipeHalf
) close(err os
.Error
) os
.Error
{
176 // Only first call to close does anything.
185 // First, send the close notification.
188 // Runner is now responding to rw operations
189 // with os.EINVAL. Cut off future rw operations
190 // by setting ioclosed flag.
195 // With ioclosed set, there will be no more rw operations
196 // working on the channels.
197 // Tell the runner we won't be bothering it anymore.
200 // Successfully torn down; can disable finalizer.
201 runtime
.SetFinalizer(p
, nil)
206 func (p
*pipeHalf
) finalizer() {
211 // A PipeReader is the read half of a pipe.
212 type PipeReader
struct {
216 // Read implements the standard Read interface:
217 // it reads data from the pipe, blocking until a writer
218 // arrives or the write end is closed.
219 // If the write end is closed with an error, that error is
220 // returned as err; otherwise err is nil.
221 func (r
*PipeReader
) Read(data
[]byte) (n
int, err os
.Error
) {
225 // Close closes the reader; subsequent writes to the
226 // write half of the pipe will return the error os.EPIPE.
227 func (r
*PipeReader
) Close() os
.Error
{
228 return r
.CloseWithError(nil)
231 // CloseWithError closes the reader; subsequent writes
232 // to the write half of the pipe will return the error err.
233 func (r
*PipeReader
) CloseWithError(err os
.Error
) os
.Error
{
240 // A PipeWriter is the write half of a pipe.
241 type PipeWriter
struct {
245 // Write implements the standard Write interface:
246 // it writes data to the pipe, blocking until readers
247 // have consumed all the data or the read end is closed.
248 // If the read end is closed with an error, that err is
249 // returned as err; otherwise err is os.EPIPE.
250 func (w
*PipeWriter
) Write(data
[]byte) (n
int, err os
.Error
) {
254 // Close closes the writer; subsequent reads from the
255 // read half of the pipe will return no bytes and os.EOF.
256 func (w
*PipeWriter
) Close() os
.Error
{
257 return w
.CloseWithError(nil)
260 // CloseWithError closes the writer; subsequent reads from the
261 // read half of the pipe will return no bytes and the error err.
262 func (w
*PipeWriter
) CloseWithError(err os
.Error
) os
.Error
{
269 // Pipe creates a synchronous in-memory pipe.
270 // It can be used to connect code expecting an io.Reader
271 // with code expecting an io.Writer.
272 // Reads on one end are matched with writes on the other,
273 // copying data directly between the two; there is no internal buffering.
274 func Pipe() (*PipeReader
, *PipeWriter
) {
276 r1
: make(chan []byte),
277 r2
: make(chan pipeResult
),
278 w1
: make(chan []byte),
279 w2
: make(chan pipeResult
),
280 rclose
: make(chan os
.Error
),
281 wclose
: make(chan os
.Error
),
282 done
: make(chan int),
286 // NOTE: Cannot use composite literal here:
287 // pipeHalf{c1: p.cr1, c2: p.cr2, cclose: p.crclose, cdone: p.cdone}
288 // because this implicitly copies the pipeHalf, which copies the inner mutex.
295 runtime
.SetFinalizer(r
, (*PipeReader
).finalizer
)
302 runtime
.SetFinalizer(w
, (*PipeWriter
).finalizer
)