1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
18 // ServerError represents an error that has been returned from
19 // the remote side of the RPC connection.
20 type ServerError
string
22 func (e ServerError
) Error() string {
26 var ErrShutdown
= errors
.New("connection is shut down")
28 // Call represents an active RPC.
30 ServiceMethod
string // The name of the service and method to call.
31 Args
interface{} // The argument to the function (*struct).
32 Reply
interface{} // The reply from the function (*struct).
33 Error error
// After completion, the error status.
34 Done
chan *Call
// Strobes when call is complete.
37 // Client represents an RPC Client.
38 // There may be multiple outstanding Calls associated
39 // with a single Client, and a Client may be used by
40 // multiple goroutines simultaneously.
46 mutex sync
.Mutex
// protects following
49 pending
map[uint64]*Call
50 closing
bool // user has called Close
51 shutdown
bool // server has told us to stop
54 // A ClientCodec implements writing of RPC requests and
55 // reading of RPC responses for the client side of an RPC session.
56 // The client calls WriteRequest to write a request to the connection
57 // and calls ReadResponseHeader and ReadResponseBody in pairs
58 // to read responses. The client calls Close when finished with the
59 // connection. ReadResponseBody may be called with a nil
60 // argument to force the body of the response to be read and then
62 type ClientCodec
interface {
63 // WriteRequest must be safe for concurrent use by multiple goroutines.
64 WriteRequest(*Request
, interface{}) error
65 ReadResponseHeader(*Response
) error
66 ReadResponseBody(interface{}) error
71 func (client
*Client
) send(call
*Call
) {
73 defer client
.sending
.Unlock()
75 // Register this call.
77 if client
.shutdown || client
.closing
{
78 call
.Error
= ErrShutdown
85 client
.pending
[seq
] = call
88 // Encode and send the request.
89 client
.request
.Seq
= seq
90 client
.request
.ServiceMethod
= call
.ServiceMethod
91 err
:= client
.codec
.WriteRequest(&client
.request
, call
.Args
)
94 call
= client
.pending
[seq
]
95 delete(client
.pending
, seq
)
104 func (client
*Client
) input() {
106 var response Response
108 response
= Response
{}
109 err
= client
.codec
.ReadResponseHeader(&response
)
115 call
:= client
.pending
[seq
]
116 delete(client
.pending
, seq
)
117 client
.mutex
.Unlock()
121 // We've got no pending call. That usually means that
122 // WriteRequest partially failed, and call was already
123 // removed; response is a server telling us about an
124 // error reading request body. We should still attempt
125 // to read error body, but there's no one to give it to.
126 err
= client
.codec
.ReadResponseBody(nil)
128 err
= errors
.New("reading error body: " + err
.Error())
130 case response
.Error
!= "":
131 // We've got an error response. Give this to the request;
132 // any subsequent requests will get the ReadResponseBody
133 // error if there is one.
134 call
.Error
= ServerError(response
.Error
)
135 err
= client
.codec
.ReadResponseBody(nil)
137 err
= errors
.New("reading error body: " + err
.Error())
141 err
= client
.codec
.ReadResponseBody(call
.Reply
)
143 call
.Error
= errors
.New("reading body " + err
.Error())
148 // Terminate pending calls.
149 client
.sending
.Lock()
151 client
.shutdown
= true
152 closing
:= client
.closing
157 err
= io
.ErrUnexpectedEOF
160 for _
, call
:= range client
.pending
{
164 client
.mutex
.Unlock()
165 client
.sending
.Unlock()
166 if debugLog
&& err
!= io
.EOF
&& !closing
{
167 log
.Println("rpc: client protocol error:", err
)
171 func (call
*Call
) done() {
173 case call
.Done
<- call
:
176 // We don't want to block here. It is the caller's responsibility to make
177 // sure the channel has enough buffer space. See comment in Go().
179 log
.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
184 // NewClient returns a new Client to handle requests to the
185 // set of services at the other end of the connection.
186 // It adds a buffer to the write side of the connection so
187 // the header and payload are sent as a unit.
188 func NewClient(conn io
.ReadWriteCloser
) *Client
{
189 encBuf
:= bufio
.NewWriter(conn
)
190 client
:= &gobClientCodec
{conn
, gob
.NewDecoder(conn
), gob
.NewEncoder(encBuf
), encBuf
}
191 return NewClientWithCodec(client
)
194 // NewClientWithCodec is like NewClient but uses the specified
195 // codec to encode requests and decode responses.
196 func NewClientWithCodec(codec ClientCodec
) *Client
{
199 pending
: make(map[uint64]*Call
),
205 type gobClientCodec
struct {
206 rwc io
.ReadWriteCloser
212 func (c
*gobClientCodec
) WriteRequest(r
*Request
, body
interface{}) (err error
) {
213 if err
= c
.enc
.Encode(r
); err
!= nil {
216 if err
= c
.enc
.Encode(body
); err
!= nil {
219 return c
.encBuf
.Flush()
222 func (c
*gobClientCodec
) ReadResponseHeader(r
*Response
) error
{
223 return c
.dec
.Decode(r
)
226 func (c
*gobClientCodec
) ReadResponseBody(body
interface{}) error
{
227 return c
.dec
.Decode(body
)
230 func (c
*gobClientCodec
) Close() error
{
234 // DialHTTP connects to an HTTP RPC server at the specified network address
235 // listening on the default HTTP RPC path.
236 func DialHTTP(network
, address
string) (*Client
, error
) {
237 return DialHTTPPath(network
, address
, DefaultRPCPath
)
240 // DialHTTPPath connects to an HTTP RPC server
241 // at the specified network address and path.
242 func DialHTTPPath(network
, address
, path
string) (*Client
, error
) {
244 conn
, err
:= net
.Dial(network
, address
)
248 io
.WriteString(conn
, "CONNECT "+path
+" HTTP/1.0\n\n")
250 // Require successful HTTP response
251 // before switching to RPC protocol.
252 resp
, err
:= http
.ReadResponse(bufio
.NewReader(conn
), &http
.Request
{Method
: "CONNECT"})
253 if err
== nil && resp
.Status
== connected
{
254 return NewClient(conn
), nil
257 err
= errors
.New("unexpected HTTP response: " + resp
.Status
)
260 return nil, &net
.OpError
{
262 Net
: network
+ " " + address
,
268 // Dial connects to an RPC server at the specified network address.
269 func Dial(network
, address
string) (*Client
, error
) {
270 conn
, err
:= net
.Dial(network
, address
)
274 return NewClient(conn
), nil
277 func (client
*Client
) Close() error
{
280 client
.mutex
.Unlock()
283 client
.closing
= true
284 client
.mutex
.Unlock()
285 return client
.codec
.Close()
288 // Go invokes the function asynchronously. It returns the Call structure representing
289 // the invocation. The done channel will signal when the call is complete by returning
290 // the same Call object. If done is nil, Go will allocate a new channel.
291 // If non-nil, done must be buffered or Go will deliberately crash.
292 func (client
*Client
) Go(serviceMethod
string, args
interface{}, reply
interface{}, done
chan *Call
) *Call
{
294 call
.ServiceMethod
= serviceMethod
298 done
= make(chan *Call
, 10) // buffered.
300 // If caller passes done != nil, it must arrange that
301 // done has enough buffer for the number of simultaneous
302 // RPCs that will be using that channel. If the channel
303 // is totally unbuffered, it's best not to run at all.
305 log
.Panic("rpc: done channel is unbuffered")
313 // Call invokes the named function, waits for it to complete, and returns its error status.
314 func (client
*Client
) Call(serviceMethod
string, args
interface{}, reply
interface{}) error
{
315 call
:= <-client
.Go(serviceMethod
, args
, reply
, make(chan *Call
, 1)).Done