1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
18 // ServerError represents an error that has been returned from
19 // the remote side of the RPC connection.
20 type ServerError
string
22 func (e ServerError
) Error() string {
26 var ErrShutdown
= errors
.New("connection is shut down")
28 // Call represents an active RPC.
30 ServiceMethod
string // The name of the service and method to call.
31 Args any
// The argument to the function (*struct).
32 Reply any
// The reply from the function (*struct).
33 Error error
// After completion, the error status.
34 Done
chan *Call
// Receives *Call when Go is complete.
37 // Client represents an RPC Client.
38 // There may be multiple outstanding Calls associated
39 // with a single Client, and a Client may be used by
40 // multiple goroutines simultaneously.
44 reqMutex sync
.Mutex
// protects following
47 mutex sync
.Mutex
// protects following
49 pending
map[uint64]*Call
50 closing
bool // user has called Close
51 shutdown
bool // server has told us to stop
54 // A ClientCodec implements writing of RPC requests and
55 // reading of RPC responses for the client side of an RPC session.
56 // The client calls WriteRequest to write a request to the connection
57 // and calls ReadResponseHeader and ReadResponseBody in pairs
58 // to read responses. The client calls Close when finished with the
59 // connection. ReadResponseBody may be called with a nil
60 // argument to force the body of the response to be read and then
62 // See NewClient's comment for information about concurrent access.
63 type ClientCodec
interface {
64 WriteRequest(*Request
, any
) error
65 ReadResponseHeader(*Response
) error
66 ReadResponseBody(any
) error
71 func (client
*Client
) send(call
*Call
) {
72 client
.reqMutex
.Lock()
73 defer client
.reqMutex
.Unlock()
75 // Register this call.
77 if client
.shutdown || client
.closing
{
79 call
.Error
= ErrShutdown
85 client
.pending
[seq
] = call
88 // Encode and send the request.
89 client
.request
.Seq
= seq
90 client
.request
.ServiceMethod
= call
.ServiceMethod
91 err
:= client
.codec
.WriteRequest(&client
.request
, call
.Args
)
94 call
= client
.pending
[seq
]
95 delete(client
.pending
, seq
)
104 func (client
*Client
) input() {
106 var response Response
108 response
= Response
{}
109 err
= client
.codec
.ReadResponseHeader(&response
)
115 call
:= client
.pending
[seq
]
116 delete(client
.pending
, seq
)
117 client
.mutex
.Unlock()
121 // We've got no pending call. That usually means that
122 // WriteRequest partially failed, and call was already
123 // removed; response is a server telling us about an
124 // error reading request body. We should still attempt
125 // to read error body, but there's no one to give it to.
126 err
= client
.codec
.ReadResponseBody(nil)
128 err
= errors
.New("reading error body: " + err
.Error())
130 case response
.Error
!= "":
131 // We've got an error response. Give this to the request;
132 // any subsequent requests will get the ReadResponseBody
133 // error if there is one.
134 call
.Error
= ServerError(response
.Error
)
135 err
= client
.codec
.ReadResponseBody(nil)
137 err
= errors
.New("reading error body: " + err
.Error())
141 err
= client
.codec
.ReadResponseBody(call
.Reply
)
143 call
.Error
= errors
.New("reading body " + err
.Error())
148 // Terminate pending calls.
149 client
.reqMutex
.Lock()
151 client
.shutdown
= true
152 closing
:= client
.closing
157 err
= io
.ErrUnexpectedEOF
160 for _
, call
:= range client
.pending
{
164 client
.mutex
.Unlock()
165 client
.reqMutex
.Unlock()
166 if debugLog
&& err
!= io
.EOF
&& !closing
{
167 log
.Println("rpc: client protocol error:", err
)
171 func (call
*Call
) done() {
173 case call
.Done
<- call
:
176 // We don't want to block here. It is the caller's responsibility to make
177 // sure the channel has enough buffer space. See comment in Go().
179 log
.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
184 // NewClient returns a new Client to handle requests to the
185 // set of services at the other end of the connection.
186 // It adds a buffer to the write side of the connection so
187 // the header and payload are sent as a unit.
189 // The read and write halves of the connection are serialized independently,
190 // so no interlocking is required. However each half may be accessed
191 // concurrently so the implementation of conn should protect against
192 // concurrent reads or concurrent writes.
193 func NewClient(conn io
.ReadWriteCloser
) *Client
{
194 encBuf
:= bufio
.NewWriter(conn
)
195 client
:= &gobClientCodec
{conn
, gob
.NewDecoder(conn
), gob
.NewEncoder(encBuf
), encBuf
}
196 return NewClientWithCodec(client
)
199 // NewClientWithCodec is like NewClient but uses the specified
200 // codec to encode requests and decode responses.
201 func NewClientWithCodec(codec ClientCodec
) *Client
{
204 pending
: make(map[uint64]*Call
),
210 type gobClientCodec
struct {
211 rwc io
.ReadWriteCloser
217 func (c
*gobClientCodec
) WriteRequest(r
*Request
, body any
) (err error
) {
218 if err
= c
.enc
.Encode(r
); err
!= nil {
221 if err
= c
.enc
.Encode(body
); err
!= nil {
224 return c
.encBuf
.Flush()
227 func (c
*gobClientCodec
) ReadResponseHeader(r
*Response
) error
{
228 return c
.dec
.Decode(r
)
231 func (c
*gobClientCodec
) ReadResponseBody(body any
) error
{
232 return c
.dec
.Decode(body
)
235 func (c
*gobClientCodec
) Close() error
{
239 // DialHTTP connects to an HTTP RPC server at the specified network address
240 // listening on the default HTTP RPC path.
241 func DialHTTP(network
, address
string) (*Client
, error
) {
242 return DialHTTPPath(network
, address
, DefaultRPCPath
)
245 // DialHTTPPath connects to an HTTP RPC server
246 // at the specified network address and path.
247 func DialHTTPPath(network
, address
, path
string) (*Client
, error
) {
248 conn
, err
:= net
.Dial(network
, address
)
252 io
.WriteString(conn
, "CONNECT "+path
+" HTTP/1.0\n\n")
254 // Require successful HTTP response
255 // before switching to RPC protocol.
256 resp
, err
:= http
.ReadResponse(bufio
.NewReader(conn
), &http
.Request
{Method
: "CONNECT"})
257 if err
== nil && resp
.Status
== connected
{
258 return NewClient(conn
), nil
261 err
= errors
.New("unexpected HTTP response: " + resp
.Status
)
264 return nil, &net
.OpError
{
266 Net
: network
+ " " + address
,
272 // Dial connects to an RPC server at the specified network address.
273 func Dial(network
, address
string) (*Client
, error
) {
274 conn
, err
:= net
.Dial(network
, address
)
278 return NewClient(conn
), nil
281 // Close calls the underlying codec's Close method. If the connection is already
282 // shutting down, ErrShutdown is returned.
283 func (client
*Client
) Close() error
{
286 client
.mutex
.Unlock()
289 client
.closing
= true
290 client
.mutex
.Unlock()
291 return client
.codec
.Close()
294 // Go invokes the function asynchronously. It returns the Call structure representing
295 // the invocation. The done channel will signal when the call is complete by returning
296 // the same Call object. If done is nil, Go will allocate a new channel.
297 // If non-nil, done must be buffered or Go will deliberately crash.
298 func (client
*Client
) Go(serviceMethod
string, args any
, reply any
, done
chan *Call
) *Call
{
300 call
.ServiceMethod
= serviceMethod
304 done
= make(chan *Call
, 10) // buffered.
306 // If caller passes done != nil, it must arrange that
307 // done has enough buffer for the number of simultaneous
308 // RPCs that will be using that channel. If the channel
309 // is totally unbuffered, it's best not to run at all.
311 log
.Panic("rpc: done channel is unbuffered")
319 // Call invokes the named function, waits for it to complete, and returns its error status.
320 func (client
*Client
) Call(serviceMethod
string, args any
, reply any
) error
{
321 call
:= <-client
.Go(serviceMethod
, args
, reply
, make(chan *Call
, 1)).Done