1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
13 // pipeDeadline is an abstraction for handling timeouts.
14 type pipeDeadline
struct {
15 mu sync
.Mutex
// Guards timer and cancel
17 cancel
chan struct{} // Must be non-nil
20 func makePipeDeadline() pipeDeadline
{
21 return pipeDeadline
{cancel
: make(chan struct{})}
24 // set sets the point in time when the deadline will time out.
25 // A timeout event is signaled by closing the channel returned by waiter.
26 // Once a timeout has occurred, the deadline can be refreshed by specifying a
27 // t value in the future.
29 // A zero value for t prevents timeout.
30 func (d
*pipeDeadline
) set(t time
.Time
) {
34 if d
.timer
!= nil && !d
.timer
.Stop() {
35 <-d
.cancel
// Wait for the timer callback to finish and close cancel
39 // Time is zero, then there is no deadline.
40 closed := isClosedChan(d
.cancel
)
43 d
.cancel
= make(chan struct{})
48 // Time in the future, setup a timer to cancel in the future.
49 if dur
:= time
.Until(t
); dur
> 0 {
51 d
.cancel
= make(chan struct{})
53 d
.timer
= time
.AfterFunc(dur
, func() {
59 // Time in the past, so close immediately.
65 // wait returns a channel that is closed when the deadline is exceeded.
66 func (d
*pipeDeadline
) wait() chan struct{} {
72 func isClosedChan(c
<-chan struct{}) bool {
81 type timeoutError
struct{}
83 func (timeoutError
) Error() string { return "deadline exceeded" }
84 func (timeoutError
) Timeout() bool { return true }
85 func (timeoutError
) Temporary() bool { return true }
87 type pipeAddr
struct{}
89 func (pipeAddr
) Network() string { return "pipe" }
90 func (pipeAddr
) String() string { return "pipe" }
93 wrMu sync
.Mutex
// Serialize Write operations
95 // Used by local Read to interact with remote Write.
96 // Successful receive on rdRx is always followed by send on rdTx.
100 // Used by local Write to interact with remote Read.
101 // Successful send on wrTx is always followed by receive on wrRx.
105 once sync
.Once
// Protects closing localDone
106 localDone
chan struct{}
107 remoteDone
<-chan struct{}
109 readDeadline pipeDeadline
110 writeDeadline pipeDeadline
113 // Pipe creates a synchronous, in-memory, full duplex
114 // network connection; both ends implement the Conn interface.
115 // Reads on one end are matched with writes on the other,
116 // copying data directly between the two; there is no internal
118 func Pipe() (Conn
, Conn
) {
119 cb1
:= make(chan []byte)
120 cb2
:= make(chan []byte)
121 cn1
:= make(chan int)
122 cn2
:= make(chan int)
123 done1
:= make(chan struct{})
124 done2
:= make(chan struct{})
127 rdRx
: cb1
, rdTx
: cn1
,
128 wrTx
: cb2
, wrRx
: cn2
,
129 localDone
: done1
, remoteDone
: done2
,
130 readDeadline
: makePipeDeadline(),
131 writeDeadline
: makePipeDeadline(),
134 rdRx
: cb2
, rdTx
: cn2
,
135 wrTx
: cb1
, wrRx
: cn1
,
136 localDone
: done2
, remoteDone
: done1
,
137 readDeadline
: makePipeDeadline(),
138 writeDeadline
: makePipeDeadline(),
143 func (*pipe
) LocalAddr() Addr
{ return pipeAddr
{} }
144 func (*pipe
) RemoteAddr() Addr
{ return pipeAddr
{} }
146 func (p
*pipe
) Read(b
[]byte) (int, error
) {
148 if err
!= nil && err
!= io
.EOF
&& err
!= io
.ErrClosedPipe
{
149 err
= &OpError
{Op
: "read", Net
: "pipe", Err
: err
}
154 func (p
*pipe
) read(b
[]byte) (n
int, err error
) {
156 case isClosedChan(p
.localDone
):
157 return 0, io
.ErrClosedPipe
158 case isClosedChan(p
.remoteDone
):
160 case isClosedChan(p
.readDeadline
.wait()):
161 return 0, timeoutError
{}
170 return 0, io
.ErrClosedPipe
173 case <-p
.readDeadline
.wait():
174 return 0, timeoutError
{}
178 func (p
*pipe
) Write(b
[]byte) (int, error
) {
180 if err
!= nil && err
!= io
.ErrClosedPipe
{
181 err
= &OpError
{Op
: "write", Net
: "pipe", Err
: err
}
186 func (p
*pipe
) write(b
[]byte) (n
int, err error
) {
188 case isClosedChan(p
.localDone
):
189 return 0, io
.ErrClosedPipe
190 case isClosedChan(p
.remoteDone
):
191 return 0, io
.ErrClosedPipe
192 case isClosedChan(p
.writeDeadline
.wait()):
193 return 0, timeoutError
{}
196 p
.wrMu
.Lock() // Ensure entirety of b is written together
197 defer p
.wrMu
.Unlock()
198 for once
:= true; once ||
len(b
) > 0; once
= false {
205 return n
, io
.ErrClosedPipe
207 return n
, io
.ErrClosedPipe
208 case <-p
.writeDeadline
.wait():
209 return n
, timeoutError
{}
215 func (p
*pipe
) SetDeadline(t time
.Time
) error
{
216 if isClosedChan(p
.localDone
) ||
isClosedChan(p
.remoteDone
) {
217 return io
.ErrClosedPipe
219 p
.readDeadline
.set(t
)
220 p
.writeDeadline
.set(t
)
224 func (p
*pipe
) SetReadDeadline(t time
.Time
) error
{
225 if isClosedChan(p
.localDone
) ||
isClosedChan(p
.remoteDone
) {
226 return io
.ErrClosedPipe
228 p
.readDeadline
.set(t
)
232 func (p
*pipe
) SetWriteDeadline(t time
.Time
) error
{
233 if isClosedChan(p
.localDone
) ||
isClosedChan(p
.remoteDone
) {
234 return io
.ErrClosedPipe
236 p
.writeDeadline
.set(t
)
240 func (p
*pipe
) Close() error
{
241 p
.once
.Do(func() { close(p
.localDone
) })