libgo: update to Go 1.11
[official-gcc.git] / libgo / go / net / pipe.go
blob9177fc403643e3b9de936811a9c1873efa5a34cf
1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
5 package net
7 import (
8 "io"
9 "sync"
10 "time"
13 // pipeDeadline is an abstraction for handling timeouts.
14 type pipeDeadline struct {
15 mu sync.Mutex // Guards timer and cancel
16 timer *time.Timer
17 cancel chan struct{} // Must be non-nil
20 func makePipeDeadline() pipeDeadline {
21 return pipeDeadline{cancel: make(chan struct{})}
24 // set sets the point in time when the deadline will time out.
25 // A timeout event is signaled by closing the channel returned by waiter.
26 // Once a timeout has occurred, the deadline can be refreshed by specifying a
27 // t value in the future.
29 // A zero value for t prevents timeout.
30 func (d *pipeDeadline) set(t time.Time) {
31 d.mu.Lock()
32 defer d.mu.Unlock()
34 if d.timer != nil && !d.timer.Stop() {
35 <-d.cancel // Wait for the timer callback to finish and close cancel
37 d.timer = nil
39 // Time is zero, then there is no deadline.
40 closed := isClosedChan(d.cancel)
41 if t.IsZero() {
42 if closed {
43 d.cancel = make(chan struct{})
45 return
48 // Time in the future, setup a timer to cancel in the future.
49 if dur := time.Until(t); dur > 0 {
50 if closed {
51 d.cancel = make(chan struct{})
53 d.timer = time.AfterFunc(dur, func() {
54 close(d.cancel)
56 return
59 // Time in the past, so close immediately.
60 if !closed {
61 close(d.cancel)
65 // wait returns a channel that is closed when the deadline is exceeded.
66 func (d *pipeDeadline) wait() chan struct{} {
67 d.mu.Lock()
68 defer d.mu.Unlock()
69 return d.cancel
72 func isClosedChan(c <-chan struct{}) bool {
73 select {
74 case <-c:
75 return true
76 default:
77 return false
81 type timeoutError struct{}
83 func (timeoutError) Error() string { return "deadline exceeded" }
84 func (timeoutError) Timeout() bool { return true }
85 func (timeoutError) Temporary() bool { return true }
87 type pipeAddr struct{}
89 func (pipeAddr) Network() string { return "pipe" }
90 func (pipeAddr) String() string { return "pipe" }
92 type pipe struct {
93 wrMu sync.Mutex // Serialize Write operations
95 // Used by local Read to interact with remote Write.
96 // Successful receive on rdRx is always followed by send on rdTx.
97 rdRx <-chan []byte
98 rdTx chan<- int
100 // Used by local Write to interact with remote Read.
101 // Successful send on wrTx is always followed by receive on wrRx.
102 wrTx chan<- []byte
103 wrRx <-chan int
105 once sync.Once // Protects closing localDone
106 localDone chan struct{}
107 remoteDone <-chan struct{}
109 readDeadline pipeDeadline
110 writeDeadline pipeDeadline
113 // Pipe creates a synchronous, in-memory, full duplex
114 // network connection; both ends implement the Conn interface.
115 // Reads on one end are matched with writes on the other,
116 // copying data directly between the two; there is no internal
117 // buffering.
118 func Pipe() (Conn, Conn) {
119 cb1 := make(chan []byte)
120 cb2 := make(chan []byte)
121 cn1 := make(chan int)
122 cn2 := make(chan int)
123 done1 := make(chan struct{})
124 done2 := make(chan struct{})
126 p1 := &pipe{
127 rdRx: cb1, rdTx: cn1,
128 wrTx: cb2, wrRx: cn2,
129 localDone: done1, remoteDone: done2,
130 readDeadline: makePipeDeadline(),
131 writeDeadline: makePipeDeadline(),
133 p2 := &pipe{
134 rdRx: cb2, rdTx: cn2,
135 wrTx: cb1, wrRx: cn1,
136 localDone: done2, remoteDone: done1,
137 readDeadline: makePipeDeadline(),
138 writeDeadline: makePipeDeadline(),
140 return p1, p2
143 func (*pipe) LocalAddr() Addr { return pipeAddr{} }
144 func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
146 func (p *pipe) Read(b []byte) (int, error) {
147 n, err := p.read(b)
148 if err != nil && err != io.EOF && err != io.ErrClosedPipe {
149 err = &OpError{Op: "read", Net: "pipe", Err: err}
151 return n, err
154 func (p *pipe) read(b []byte) (n int, err error) {
155 switch {
156 case isClosedChan(p.localDone):
157 return 0, io.ErrClosedPipe
158 case isClosedChan(p.remoteDone):
159 return 0, io.EOF
160 case isClosedChan(p.readDeadline.wait()):
161 return 0, timeoutError{}
164 select {
165 case bw := <-p.rdRx:
166 nr := copy(b, bw)
167 p.rdTx <- nr
168 return nr, nil
169 case <-p.localDone:
170 return 0, io.ErrClosedPipe
171 case <-p.remoteDone:
172 return 0, io.EOF
173 case <-p.readDeadline.wait():
174 return 0, timeoutError{}
178 func (p *pipe) Write(b []byte) (int, error) {
179 n, err := p.write(b)
180 if err != nil && err != io.ErrClosedPipe {
181 err = &OpError{Op: "write", Net: "pipe", Err: err}
183 return n, err
186 func (p *pipe) write(b []byte) (n int, err error) {
187 switch {
188 case isClosedChan(p.localDone):
189 return 0, io.ErrClosedPipe
190 case isClosedChan(p.remoteDone):
191 return 0, io.ErrClosedPipe
192 case isClosedChan(p.writeDeadline.wait()):
193 return 0, timeoutError{}
196 p.wrMu.Lock() // Ensure entirety of b is written together
197 defer p.wrMu.Unlock()
198 for once := true; once || len(b) > 0; once = false {
199 select {
200 case p.wrTx <- b:
201 nw := <-p.wrRx
202 b = b[nw:]
203 n += nw
204 case <-p.localDone:
205 return n, io.ErrClosedPipe
206 case <-p.remoteDone:
207 return n, io.ErrClosedPipe
208 case <-p.writeDeadline.wait():
209 return n, timeoutError{}
212 return n, nil
215 func (p *pipe) SetDeadline(t time.Time) error {
216 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
217 return io.ErrClosedPipe
219 p.readDeadline.set(t)
220 p.writeDeadline.set(t)
221 return nil
224 func (p *pipe) SetReadDeadline(t time.Time) error {
225 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
226 return io.ErrClosedPipe
228 p.readDeadline.set(t)
229 return nil
232 func (p *pipe) SetWriteDeadline(t time.Time) error {
233 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
234 return io.ErrClosedPipe
236 p.writeDeadline.set(t)
237 return nil
240 func (p *pipe) Close() error {
241 p.once.Do(func() { close(p.localDone) })
242 return nil