2011-02-15 Tobias Burnus <burnus@net-b.de>
[official-gcc.git] / libgo / go / io / pipe.go
blobdf76418b93da55d722fbd3090c14efcf3eadb545
1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
5 // Pipe adapter to connect code expecting an io.Reader
6 // with code expecting an io.Writer.
8 package io
10 import (
11 "os"
12 "runtime"
13 "sync"
16 type pipeResult struct {
17 n int
18 err os.Error
21 // Shared pipe structure.
22 type pipe struct {
23 // Reader sends on cr1, receives on cr2.
24 // Writer does the same on cw1, cw2.
25 r1, w1 chan []byte
26 r2, w2 chan pipeResult
28 rclose chan os.Error // read close; error to return to writers
29 wclose chan os.Error // write close; error to return to readers
31 done chan int // read or write half is done
34 func (p *pipe) run() {
35 var (
36 rb []byte // pending Read
37 wb []byte // pending Write
38 wn int // amount written so far from wb
39 rerr os.Error // if read end is closed, error to send to writers
40 werr os.Error // if write end is closed, error to send to readers
41 r1 chan []byte // p.cr1 or nil depending on whether Read is ok
42 w1 chan []byte // p.cw1 or nil depending on whether Write is ok
43 ndone int
46 // Read and Write are enabled at the start.
47 r1 = p.r1
48 w1 = p.w1
50 for {
51 select {
52 case <-p.done:
53 if ndone++; ndone == 2 {
54 // both reader and writer are gone
55 // close out any existing i/o
56 if r1 == nil {
57 p.r2 <- pipeResult{0, os.EINVAL}
59 if w1 == nil {
60 p.w2 <- pipeResult{0, os.EINVAL}
62 return
64 continue
65 case rerr = <-p.rclose:
66 if w1 == nil {
67 // finish pending Write
68 p.w2 <- pipeResult{wn, rerr}
69 wn = 0
70 w1 = p.w1 // allow another Write
72 if r1 == nil {
73 // Close of read side during Read.
74 // finish pending Read with os.EINVAL.
75 p.r2 <- pipeResult{0, os.EINVAL}
76 r1 = p.r1 // allow another Read
78 continue
79 case werr = <-p.wclose:
80 if r1 == nil {
81 // finish pending Read
82 p.r2 <- pipeResult{0, werr}
83 r1 = p.r1 // allow another Read
85 if w1 == nil {
86 // Close of write side during Write.
87 // finish pending Write with os.EINVAL.
88 p.w2 <- pipeResult{wn, os.EINVAL}
89 wn = 0
90 w1 = p.w1 // allow another Write
92 continue
93 case rb = <-r1:
94 if werr != nil {
95 // write end is closed
96 p.r2 <- pipeResult{0, werr}
97 continue
99 if rerr != nil {
100 // read end is closed
101 p.r2 <- pipeResult{0, os.EINVAL}
102 continue
104 r1 = nil // disable Read until this one is done
105 case wb = <-w1:
106 if rerr != nil {
107 // read end is closed
108 p.w2 <- pipeResult{0, rerr}
109 continue
111 if werr != nil {
112 // write end is closed
113 p.w2 <- pipeResult{0, os.EINVAL}
114 continue
116 w1 = nil // disable Write until this one is done
119 if r1 == nil && w1 == nil {
120 // Have rb and wb. Execute.
121 n := copy(rb, wb)
122 wn += n
123 wb = wb[n:]
125 // Finish Read.
126 p.r2 <- pipeResult{n, nil}
127 r1 = p.r1 // allow another Read
129 // Maybe finish Write.
130 if len(wb) == 0 {
131 p.w2 <- pipeResult{wn, nil}
132 wn = 0
133 w1 = p.w1 // allow another Write
139 // Read/write halves of the pipe.
140 // They are separate structures for two reasons:
141 // 1. If one end becomes garbage without being Closed,
142 // its finalizer can Close so that the other end
143 // does not hang indefinitely.
144 // 2. Clients cannot use interface conversions on the
145 // read end to find the Write method, and vice versa.
147 type pipeHalf struct {
148 c1 chan []byte
149 c2 chan pipeResult
150 cclose chan os.Error
151 done chan int
153 lock sync.Mutex
154 closed bool
156 io sync.Mutex
157 ioclosed bool
160 func (p *pipeHalf) rw(data []byte) (n int, err os.Error) {
161 // Run i/o operation.
162 // Check ioclosed flag under lock to make sure we're still allowed to do i/o.
163 p.io.Lock()
164 if p.ioclosed {
165 p.io.Unlock()
166 return 0, os.EINVAL
168 p.io.Unlock()
169 p.c1 <- data
170 res := <-p.c2
171 return res.n, res.err
174 func (p *pipeHalf) close(err os.Error) os.Error {
175 // Close pipe half.
176 // Only first call to close does anything.
177 p.lock.Lock()
178 if p.closed {
179 p.lock.Unlock()
180 return os.EINVAL
182 p.closed = true
183 p.lock.Unlock()
185 // First, send the close notification.
186 p.cclose <- err
188 // Runner is now responding to rw operations
189 // with os.EINVAL. Cut off future rw operations
190 // by setting ioclosed flag.
191 p.io.Lock()
192 p.ioclosed = true
193 p.io.Unlock()
195 // With ioclosed set, there will be no more rw operations
196 // working on the channels.
197 // Tell the runner we won't be bothering it anymore.
198 p.done <- 1
200 // Successfully torn down; can disable finalizer.
201 runtime.SetFinalizer(p, nil)
203 return nil
206 func (p *pipeHalf) finalizer() {
207 p.close(os.EINVAL)
211 // A PipeReader is the read half of a pipe.
212 type PipeReader struct {
213 pipeHalf
216 // Read implements the standard Read interface:
217 // it reads data from the pipe, blocking until a writer
218 // arrives or the write end is closed.
219 // If the write end is closed with an error, that error is
220 // returned as err; otherwise err is nil.
221 func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
222 return r.rw(data)
225 // Close closes the reader; subsequent writes to the
226 // write half of the pipe will return the error os.EPIPE.
227 func (r *PipeReader) Close() os.Error {
228 return r.CloseWithError(nil)
231 // CloseWithError closes the reader; subsequent writes
232 // to the write half of the pipe will return the error err.
233 func (r *PipeReader) CloseWithError(err os.Error) os.Error {
234 if err == nil {
235 err = os.EPIPE
237 return r.close(err)
240 // A PipeWriter is the write half of a pipe.
241 type PipeWriter struct {
242 pipeHalf
245 // Write implements the standard Write interface:
246 // it writes data to the pipe, blocking until readers
247 // have consumed all the data or the read end is closed.
248 // If the read end is closed with an error, that err is
249 // returned as err; otherwise err is os.EPIPE.
250 func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
251 return w.rw(data)
254 // Close closes the writer; subsequent reads from the
255 // read half of the pipe will return no bytes and os.EOF.
256 func (w *PipeWriter) Close() os.Error {
257 return w.CloseWithError(nil)
260 // CloseWithError closes the writer; subsequent reads from the
261 // read half of the pipe will return no bytes and the error err.
262 func (w *PipeWriter) CloseWithError(err os.Error) os.Error {
263 if err == nil {
264 err = os.EOF
266 return w.close(err)
269 // Pipe creates a synchronous in-memory pipe.
270 // It can be used to connect code expecting an io.Reader
271 // with code expecting an io.Writer.
272 // Reads on one end are matched with writes on the other,
273 // copying data directly between the two; there is no internal buffering.
274 func Pipe() (*PipeReader, *PipeWriter) {
275 p := &pipe{
276 r1: make(chan []byte),
277 r2: make(chan pipeResult),
278 w1: make(chan []byte),
279 w2: make(chan pipeResult),
280 rclose: make(chan os.Error),
281 wclose: make(chan os.Error),
282 done: make(chan int),
284 go p.run()
286 // NOTE: Cannot use composite literal here:
287 // pipeHalf{c1: p.cr1, c2: p.cr2, cclose: p.crclose, cdone: p.cdone}
288 // because this implicitly copies the pipeHalf, which copies the inner mutex.
290 r := new(PipeReader)
291 r.c1 = p.r1
292 r.c2 = p.r2
293 r.cclose = p.rclose
294 r.done = p.done
295 runtime.SetFinalizer(r, (*PipeReader).finalizer)
297 w := new(PipeWriter)
298 w.c1 = p.w1
299 w.c2 = p.w2
300 w.cclose = p.wclose
301 w.done = p.done
302 runtime.SetFinalizer(w, (*PipeWriter).finalizer)
304 return r, w