2 // Copyright © 2011-2017 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
25 // Exported Connection methods
28 Connected returns the current connection status.
30 func (c
*Connection
) Connected() bool {
35 Session returns the broker assigned session id.
37 func (c
*Connection
) Session() string {
42 Protocol returns the current connection protocol level.
44 func (c
*Connection
) Protocol() string {
49 SetLogger enables a client defined logger for this connection.
51 Set to "nil" to disable logging.
55 l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
58 func (c
*Connection
) SetLogger(l
*log
.Logger
) {
65 SendTickerInterval returns any heartbeat send ticker interval in ms. A return
66 value of zero means no heartbeats are being sent.
68 func (c
*Connection
) SendTickerInterval() int64 {
72 return c
.hbd
.sti
/ 1000000
76 ReceiveTickerInterval returns any heartbeat receive ticker interval in ms.
77 A return value of zero means no heartbeats are being received.
79 func (c
*Connection
) ReceiveTickerInterval() int64 {
83 return c
.hbd
.rti
/ 1000000
87 SendTickerCount returns any heartbeat send ticker count. A return value of
88 zero usually indicates no send heartbeats are enabled.
90 func (c
*Connection
) SendTickerCount() int64 {
98 ReceiveTickerCount returns any heartbeat receive ticker count. A return
99 value of zero usually indicates no read heartbeats are enabled.
101 func (c
*Connection
) ReceiveTickerCount() int64 {
109 FramesRead returns a count of the number of frames read on the connection.
111 func (c
*Connection
) FramesRead() int64 {
116 BytesRead returns a count of the number of bytes read on the connection.
118 func (c
*Connection
) BytesRead() int64 {
123 FramesWritten returns a count of the number of frames written on the connection.
125 func (c
*Connection
) FramesWritten() int64 {
130 BytesWritten returns a count of the number of bytes written on the connection.
132 func (c
*Connection
) BytesWritten() int64 {
137 Running returns a time duration since connection start.
139 func (c
*Connection
) Running() time
.Duration
{
140 return time
.Since(c
.mets
.st
)
144 SubChanCap returns the current subscribe channel capacity.
146 func (c
*Connection
) SubChanCap() int {
151 SetSubChanCap sets a new subscribe channel capacity, to be used during future
152 SUBSCRIBE operations.
154 func (c
*Connection
) SetSubChanCap(nc
int) {
159 // Unexported Connection methods
162 Log data if possible.
164 func (c
*Connection
) log(v
...interface{}) {
166 defer logLock
.Unlock()
170 _
, fn
, ld
, ok
:= runtime
.Caller(1)
173 c
.logger
.Printf("%s %s %d %v\n", c
.session
, fn
, ld
, v
)
175 c
.logger
.Print(c
.session
, v
)
183 func (c
*Connection
) shutdownHeartBeats() {
184 // Shutdown heartbeats if necessary
203 func (c
*Connection
) shutdown() {
204 c
.log("SHUTDOWN", "starts")
205 c
.shutdownHeartBeats()
206 // Close all individual subscribe channels
207 // This is a write lock
209 for key
:= range c
.subs
{
210 close(c
.subs
[key
].md
)
211 c
.subs
[key
].cs
= true
215 c
.log("SHUTDOWN", "ends")
222 func (c
*Connection
) handleReadError(md MessageData
) {
223 c
.log("HDRERR", "starts", md
)
224 c
.shutdownHeartBeats() // We are done here
225 // Notify any general subscriber of error
227 // Notify all individual subscribers of error
228 // This is a read lock
231 for key
:= range c
.subs
{
236 // Try to catch the writer
238 c
.log("HDRERR", "ends")
239 // Let further shutdown logic proceed normally.