libgo: update to Go 1.11
[official-gcc.git] / libgo / go / net / rpc / client.go
blobcad2d45e7f8d4f4dcdc5f60e7c3d342db54860e6
1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
5 package rpc
7 import (
8 "bufio"
9 "encoding/gob"
10 "errors"
11 "io"
12 "log"
13 "net"
14 "net/http"
15 "sync"
18 // ServerError represents an error that has been returned from
19 // the remote side of the RPC connection.
20 type ServerError string
22 func (e ServerError) Error() string {
23 return string(e)
26 var ErrShutdown = errors.New("connection is shut down")
28 // Call represents an active RPC.
29 type Call struct {
30 ServiceMethod string // The name of the service and method to call.
31 Args interface{} // The argument to the function (*struct).
32 Reply interface{} // The reply from the function (*struct).
33 Error error // After completion, the error status.
34 Done chan *Call // Strobes when call is complete.
37 // Client represents an RPC Client.
38 // There may be multiple outstanding Calls associated
39 // with a single Client, and a Client may be used by
40 // multiple goroutines simultaneously.
41 type Client struct {
42 codec ClientCodec
44 reqMutex sync.Mutex // protects following
45 request Request
47 mutex sync.Mutex // protects following
48 seq uint64
49 pending map[uint64]*Call
50 closing bool // user has called Close
51 shutdown bool // server has told us to stop
54 // A ClientCodec implements writing of RPC requests and
55 // reading of RPC responses for the client side of an RPC session.
56 // The client calls WriteRequest to write a request to the connection
57 // and calls ReadResponseHeader and ReadResponseBody in pairs
58 // to read responses. The client calls Close when finished with the
59 // connection. ReadResponseBody may be called with a nil
60 // argument to force the body of the response to be read and then
61 // discarded.
62 // See NewClient's comment for information about concurrent access.
63 type ClientCodec interface {
64 WriteRequest(*Request, interface{}) error
65 ReadResponseHeader(*Response) error
66 ReadResponseBody(interface{}) error
68 Close() error
71 func (client *Client) send(call *Call) {
72 client.reqMutex.Lock()
73 defer client.reqMutex.Unlock()
75 // Register this call.
76 client.mutex.Lock()
77 if client.shutdown || client.closing {
78 client.mutex.Unlock()
79 call.Error = ErrShutdown
80 call.done()
81 return
83 seq := client.seq
84 client.seq++
85 client.pending[seq] = call
86 client.mutex.Unlock()
88 // Encode and send the request.
89 client.request.Seq = seq
90 client.request.ServiceMethod = call.ServiceMethod
91 err := client.codec.WriteRequest(&client.request, call.Args)
92 if err != nil {
93 client.mutex.Lock()
94 call = client.pending[seq]
95 delete(client.pending, seq)
96 client.mutex.Unlock()
97 if call != nil {
98 call.Error = err
99 call.done()
104 func (client *Client) input() {
105 var err error
106 var response Response
107 for err == nil {
108 response = Response{}
109 err = client.codec.ReadResponseHeader(&response)
110 if err != nil {
111 break
113 seq := response.Seq
114 client.mutex.Lock()
115 call := client.pending[seq]
116 delete(client.pending, seq)
117 client.mutex.Unlock()
119 switch {
120 case call == nil:
121 // We've got no pending call. That usually means that
122 // WriteRequest partially failed, and call was already
123 // removed; response is a server telling us about an
124 // error reading request body. We should still attempt
125 // to read error body, but there's no one to give it to.
126 err = client.codec.ReadResponseBody(nil)
127 if err != nil {
128 err = errors.New("reading error body: " + err.Error())
130 case response.Error != "":
131 // We've got an error response. Give this to the request;
132 // any subsequent requests will get the ReadResponseBody
133 // error if there is one.
134 call.Error = ServerError(response.Error)
135 err = client.codec.ReadResponseBody(nil)
136 if err != nil {
137 err = errors.New("reading error body: " + err.Error())
139 call.done()
140 default:
141 err = client.codec.ReadResponseBody(call.Reply)
142 if err != nil {
143 call.Error = errors.New("reading body " + err.Error())
145 call.done()
148 // Terminate pending calls.
149 client.reqMutex.Lock()
150 client.mutex.Lock()
151 client.shutdown = true
152 closing := client.closing
153 if err == io.EOF {
154 if closing {
155 err = ErrShutdown
156 } else {
157 err = io.ErrUnexpectedEOF
160 for _, call := range client.pending {
161 call.Error = err
162 call.done()
164 client.mutex.Unlock()
165 client.reqMutex.Unlock()
166 if debugLog && err != io.EOF && !closing {
167 log.Println("rpc: client protocol error:", err)
171 func (call *Call) done() {
172 select {
173 case call.Done <- call:
174 // ok
175 default:
176 // We don't want to block here. It is the caller's responsibility to make
177 // sure the channel has enough buffer space. See comment in Go().
178 if debugLog {
179 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
184 // NewClient returns a new Client to handle requests to the
185 // set of services at the other end of the connection.
186 // It adds a buffer to the write side of the connection so
187 // the header and payload are sent as a unit.
189 // The read and write halves of the connection are serialized independently,
190 // so no interlocking is required. However each half may be accessed
191 // concurrently so the implementation of conn should protect against
192 // concurrent reads or concurrent writes.
193 func NewClient(conn io.ReadWriteCloser) *Client {
194 encBuf := bufio.NewWriter(conn)
195 client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
196 return NewClientWithCodec(client)
199 // NewClientWithCodec is like NewClient but uses the specified
200 // codec to encode requests and decode responses.
201 func NewClientWithCodec(codec ClientCodec) *Client {
202 client := &Client{
203 codec: codec,
204 pending: make(map[uint64]*Call),
206 go client.input()
207 return client
210 type gobClientCodec struct {
211 rwc io.ReadWriteCloser
212 dec *gob.Decoder
213 enc *gob.Encoder
214 encBuf *bufio.Writer
217 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
218 if err = c.enc.Encode(r); err != nil {
219 return
221 if err = c.enc.Encode(body); err != nil {
222 return
224 return c.encBuf.Flush()
227 func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
228 return c.dec.Decode(r)
231 func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
232 return c.dec.Decode(body)
235 func (c *gobClientCodec) Close() error {
236 return c.rwc.Close()
239 // DialHTTP connects to an HTTP RPC server at the specified network address
240 // listening on the default HTTP RPC path.
241 func DialHTTP(network, address string) (*Client, error) {
242 return DialHTTPPath(network, address, DefaultRPCPath)
245 // DialHTTPPath connects to an HTTP RPC server
246 // at the specified network address and path.
247 func DialHTTPPath(network, address, path string) (*Client, error) {
248 var err error
249 conn, err := net.Dial(network, address)
250 if err != nil {
251 return nil, err
253 io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
255 // Require successful HTTP response
256 // before switching to RPC protocol.
257 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
258 if err == nil && resp.Status == connected {
259 return NewClient(conn), nil
261 if err == nil {
262 err = errors.New("unexpected HTTP response: " + resp.Status)
264 conn.Close()
265 return nil, &net.OpError{
266 Op: "dial-http",
267 Net: network + " " + address,
268 Addr: nil,
269 Err: err,
273 // Dial connects to an RPC server at the specified network address.
274 func Dial(network, address string) (*Client, error) {
275 conn, err := net.Dial(network, address)
276 if err != nil {
277 return nil, err
279 return NewClient(conn), nil
282 // Close calls the underlying codec's Close method. If the connection is already
283 // shutting down, ErrShutdown is returned.
284 func (client *Client) Close() error {
285 client.mutex.Lock()
286 if client.closing {
287 client.mutex.Unlock()
288 return ErrShutdown
290 client.closing = true
291 client.mutex.Unlock()
292 return client.codec.Close()
295 // Go invokes the function asynchronously. It returns the Call structure representing
296 // the invocation. The done channel will signal when the call is complete by returning
297 // the same Call object. If done is nil, Go will allocate a new channel.
298 // If non-nil, done must be buffered or Go will deliberately crash.
299 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
300 call := new(Call)
301 call.ServiceMethod = serviceMethod
302 call.Args = args
303 call.Reply = reply
304 if done == nil {
305 done = make(chan *Call, 10) // buffered.
306 } else {
307 // If caller passes done != nil, it must arrange that
308 // done has enough buffer for the number of simultaneous
309 // RPCs that will be using that channel. If the channel
310 // is totally unbuffered, it's best not to run at all.
311 if cap(done) == 0 {
312 log.Panic("rpc: done channel is unbuffered")
315 call.Done = done
316 client.send(call)
317 return call
320 // Call invokes the named function, waits for it to complete, and returns its error status.
321 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
322 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
323 return call.Error