1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
14 // pipeDeadline is an abstraction for handling timeouts.
15 type pipeDeadline
struct {
16 mu sync
.Mutex
// Guards timer and cancel
18 cancel
chan struct{} // Must be non-nil
21 func makePipeDeadline() pipeDeadline
{
22 return pipeDeadline
{cancel
: make(chan struct{})}
25 // set sets the point in time when the deadline will time out.
26 // A timeout event is signaled by closing the channel returned by waiter.
27 // Once a timeout has occurred, the deadline can be refreshed by specifying a
28 // t value in the future.
30 // A zero value for t prevents timeout.
31 func (d
*pipeDeadline
) set(t time
.Time
) {
35 if d
.timer
!= nil && !d
.timer
.Stop() {
36 <-d
.cancel
// Wait for the timer callback to finish and close cancel
40 // Time is zero, then there is no deadline.
41 closed := isClosedChan(d
.cancel
)
44 d
.cancel
= make(chan struct{})
49 // Time in the future, setup a timer to cancel in the future.
50 if dur
:= time
.Until(t
); dur
> 0 {
52 d
.cancel
= make(chan struct{})
54 d
.timer
= time
.AfterFunc(dur
, func() {
60 // Time in the past, so close immediately.
66 // wait returns a channel that is closed when the deadline is exceeded.
67 func (d
*pipeDeadline
) wait() chan struct{} {
73 func isClosedChan(c
<-chan struct{}) bool {
82 type pipeAddr
struct{}
84 func (pipeAddr
) Network() string { return "pipe" }
85 func (pipeAddr
) String() string { return "pipe" }
88 wrMu sync
.Mutex
// Serialize Write operations
90 // Used by local Read to interact with remote Write.
91 // Successful receive on rdRx is always followed by send on rdTx.
95 // Used by local Write to interact with remote Read.
96 // Successful send on wrTx is always followed by receive on wrRx.
100 once sync
.Once
// Protects closing localDone
101 localDone
chan struct{}
102 remoteDone
<-chan struct{}
104 readDeadline pipeDeadline
105 writeDeadline pipeDeadline
108 // Pipe creates a synchronous, in-memory, full duplex
109 // network connection; both ends implement the Conn interface.
110 // Reads on one end are matched with writes on the other,
111 // copying data directly between the two; there is no internal
113 func Pipe() (Conn
, Conn
) {
114 cb1
:= make(chan []byte)
115 cb2
:= make(chan []byte)
116 cn1
:= make(chan int)
117 cn2
:= make(chan int)
118 done1
:= make(chan struct{})
119 done2
:= make(chan struct{})
122 rdRx
: cb1
, rdTx
: cn1
,
123 wrTx
: cb2
, wrRx
: cn2
,
124 localDone
: done1
, remoteDone
: done2
,
125 readDeadline
: makePipeDeadline(),
126 writeDeadline
: makePipeDeadline(),
129 rdRx
: cb2
, rdTx
: cn2
,
130 wrTx
: cb1
, wrRx
: cn1
,
131 localDone
: done2
, remoteDone
: done1
,
132 readDeadline
: makePipeDeadline(),
133 writeDeadline
: makePipeDeadline(),
138 func (*pipe
) LocalAddr() Addr
{ return pipeAddr
{} }
139 func (*pipe
) RemoteAddr() Addr
{ return pipeAddr
{} }
141 func (p
*pipe
) Read(b
[]byte) (int, error
) {
143 if err
!= nil && err
!= io
.EOF
&& err
!= io
.ErrClosedPipe
{
144 err
= &OpError
{Op
: "read", Net
: "pipe", Err
: err
}
149 func (p
*pipe
) read(b
[]byte) (n
int, err error
) {
151 case isClosedChan(p
.localDone
):
152 return 0, io
.ErrClosedPipe
153 case isClosedChan(p
.remoteDone
):
155 case isClosedChan(p
.readDeadline
.wait()):
156 return 0, os
.ErrDeadlineExceeded
165 return 0, io
.ErrClosedPipe
168 case <-p
.readDeadline
.wait():
169 return 0, os
.ErrDeadlineExceeded
173 func (p
*pipe
) Write(b
[]byte) (int, error
) {
175 if err
!= nil && err
!= io
.ErrClosedPipe
{
176 err
= &OpError
{Op
: "write", Net
: "pipe", Err
: err
}
181 func (p
*pipe
) write(b
[]byte) (n
int, err error
) {
183 case isClosedChan(p
.localDone
):
184 return 0, io
.ErrClosedPipe
185 case isClosedChan(p
.remoteDone
):
186 return 0, io
.ErrClosedPipe
187 case isClosedChan(p
.writeDeadline
.wait()):
188 return 0, os
.ErrDeadlineExceeded
191 p
.wrMu
.Lock() // Ensure entirety of b is written together
192 defer p
.wrMu
.Unlock()
193 for once
:= true; once ||
len(b
) > 0; once
= false {
200 return n
, io
.ErrClosedPipe
202 return n
, io
.ErrClosedPipe
203 case <-p
.writeDeadline
.wait():
204 return n
, os
.ErrDeadlineExceeded
210 func (p
*pipe
) SetDeadline(t time
.Time
) error
{
211 if isClosedChan(p
.localDone
) ||
isClosedChan(p
.remoteDone
) {
212 return io
.ErrClosedPipe
214 p
.readDeadline
.set(t
)
215 p
.writeDeadline
.set(t
)
219 func (p
*pipe
) SetReadDeadline(t time
.Time
) error
{
220 if isClosedChan(p
.localDone
) ||
isClosedChan(p
.remoteDone
) {
221 return io
.ErrClosedPipe
223 p
.readDeadline
.set(t
)
227 func (p
*pipe
) SetWriteDeadline(t time
.Time
) error
{
228 if isClosedChan(p
.localDone
) ||
isClosedChan(p
.remoteDone
) {
229 return io
.ErrClosedPipe
231 p
.writeDeadline
.set(t
)
235 func (p
*pipe
) Close() error
{
236 p
.once
.Do(func() { close(p
.localDone
) })