Rework writer deadline handling.
[stompngo.git] / connection.go
blobe483fd7a21b79ba17cb4d0bfd9d47e788f9ba23b
1 //
2 // Copyright © 2011-2017 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "log"
21 "runtime"
22 "time"
25 // Exported Connection methods
28 Connected returns the current connection status.
30 func (c *Connection) Connected() bool {
31 return c.connected
35 Session returns the broker assigned session id.
37 func (c *Connection) Session() string {
38 return c.session
42 Protocol returns the current connection protocol level.
44 func (c *Connection) Protocol() string {
45 return c.protocol
49 SetLogger enables a client defined logger for this connection.
51 Set to "nil" to disable logging.
53 Example:
54 // Start logging
55 l := log.New(os.Stdout, "", log.Ldate|log.Lmicroseconds)
56 c.SetLogger(l)
58 func (c *Connection) SetLogger(l *log.Logger) {
59 logLock.Lock()
60 c.logger = l
61 logLock.Unlock()
65 SendTickerInterval returns any heartbeat send ticker interval in ms. A return
66 value of zero means no heartbeats are being sent.
68 func (c *Connection) SendTickerInterval() int64 {
69 if c.hbd == nil {
70 return 0
72 return c.hbd.sti / 1000000
76 ReceiveTickerInterval returns any heartbeat receive ticker interval in ms.
77 A return value of zero means no heartbeats are being received.
79 func (c *Connection) ReceiveTickerInterval() int64 {
80 if c.hbd == nil {
81 return 0
83 return c.hbd.rti / 1000000
87 SendTickerCount returns any heartbeat send ticker count. A return value of
88 zero usually indicates no send heartbeats are enabled.
90 func (c *Connection) SendTickerCount() int64 {
91 if c.hbd == nil {
92 return 0
94 return c.hbd.sc
98 ReceiveTickerCount returns any heartbeat receive ticker count. A return
99 value of zero usually indicates no read heartbeats are enabled.
101 func (c *Connection) ReceiveTickerCount() int64 {
102 if c.hbd == nil {
103 return 0
105 return c.hbd.rc
109 FramesRead returns a count of the number of frames read on the connection.
111 func (c *Connection) FramesRead() int64 {
112 return c.mets.tfr
116 BytesRead returns a count of the number of bytes read on the connection.
118 func (c *Connection) BytesRead() int64 {
119 return c.mets.tbr
123 FramesWritten returns a count of the number of frames written on the connection.
125 func (c *Connection) FramesWritten() int64 {
126 return c.mets.tfw
130 BytesWritten returns a count of the number of bytes written on the connection.
132 func (c *Connection) BytesWritten() int64 {
133 return c.mets.tbw
137 Running returns a time duration since connection start.
139 func (c *Connection) Running() time.Duration {
140 return time.Since(c.mets.st)
144 SubChanCap returns the current subscribe channel capacity.
146 func (c *Connection) SubChanCap() int {
147 return c.scc
151 SetSubChanCap sets a new subscribe channel capacity, to be used during future
152 SUBSCRIBE operations.
154 func (c *Connection) SetSubChanCap(nc int) {
155 c.scc = nc
156 return
159 // Unexported Connection methods
162 Log data if possible.
164 func (c *Connection) log(v ...interface{}) {
165 logLock.Lock()
166 defer logLock.Unlock()
167 if c.logger == nil {
168 return
170 _, fn, ld, ok := runtime.Caller(1)
172 if ok {
173 c.logger.Printf("%s %s %d %v\n", c.session, fn, ld, v)
174 } else {
175 c.logger.Print(c.session, v)
177 return
181 Shutdown heartbeats
183 func (c *Connection) shutdownHeartBeats() {
184 // Shutdown heartbeats if necessary
185 if c.hbd != nil {
186 c.hbd.clk.Lock()
187 if !c.hbd.ssdn {
188 if c.hbd.hbs {
189 close(c.hbd.ssd)
191 if c.hbd.hbr {
192 close(c.hbd.rsd)
194 c.hbd.ssdn = true
196 c.hbd.clk.Unlock()
201 Shutdown logic.
203 func (c *Connection) shutdown() {
204 c.log("SHUTDOWN", "starts")
205 c.shutdownHeartBeats()
206 // Close all individual subscribe channels
207 // This is a write lock
208 c.subsLock.Lock()
209 for key := range c.subs {
210 close(c.subs[key].md)
211 c.subs[key].cs = true
213 c.connected = false
214 c.subsLock.Unlock()
215 c.log("SHUTDOWN", "ends")
216 return
220 Read error handler.
222 func (c *Connection) handleReadError(md MessageData) {
223 c.log("HDRERR", "starts", md)
224 c.shutdownHeartBeats() // We are done here
225 // Notify any general subscriber of error
226 c.input <- md
227 // Notify all individual subscribers of error
228 // This is a read lock
229 c.subsLock.RLock()
230 if c.connected {
231 for key := range c.subs {
232 c.subs[key].md <- md
235 c.subsLock.RUnlock()
236 // Try to catch the writer
237 close(c.wtrsdc)
238 c.log("HDRERR", "ends")
239 // Let further shutdown logic proceed normally.
240 return