Rework writer deadline handling.
[stompngo.git] / heartbeats.go
blobe85f5dc475f4315bbf4950a03525b98216db7b3f
1 //
2 // Copyright © 2011-2017 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "fmt"
21 "strconv"
22 "strings"
23 "time"
27 Initialize heart beats if necessary and possible.
29 Return an error, possibly nil, to mainline if initialization can not
30 complete. Establish heartbeat send and receive goroutines as necessary.
32 func (c *Connection) initializeHeartBeats(ch Headers) (e error) {
33 // Client wants Heartbeats ?
34 vc, ok := ch.Contains(HK_HEART_BEAT)
35 if !ok || vc == "0,0" {
36 return nil
38 // Server wants Heartbeats ?
39 vs, ok := c.ConnectResponse.Headers.Contains(HK_HEART_BEAT)
40 if !ok || vs == "0,0" {
41 return nil
43 // Work area, may or may not become connection heartbeat data
44 w := &heartBeatData{cx: 0, cy: 0, sx: 0, sy: 0,
45 hbs: true, hbr: true, // possible reset later
46 sti: 0, rti: 0,
47 ls: 0, lr: 0}
49 // Client specified values
50 cp := strings.Split(vc, ",")
51 if len(cp) != 2 { // S/B caught by the server first
52 return Error("invalid client heart-beat header: " + vc)
54 w.cx, e = strconv.ParseInt(cp[0], 10, 64)
55 if e != nil {
56 return Error("non-numeric cx heartbeat value: " + cp[0])
58 w.cy, e = strconv.ParseInt(cp[1], 10, 64)
59 if e != nil {
60 return Error("non-numeric cy heartbeat value: " + cp[1])
63 // Server specified values
64 sp := strings.Split(vs, ",")
65 if len(sp) != 2 {
66 return Error("invalid server heart-beat header: " + vs)
68 w.sx, e = strconv.ParseInt(sp[0], 10, 64)
69 if e != nil {
70 return Error("non-numeric sx heartbeat value: " + sp[0])
72 w.sy, e = strconv.ParseInt(sp[1], 10, 64)
73 if e != nil {
74 return Error("non-numeric sy heartbeat value: " + sp[1])
77 // Check for sending needed
78 if w.cx == 0 || w.sy == 0 {
79 w.hbs = false //
82 // Check for receiving needed
83 if w.sx == 0 || w.cy == 0 {
84 w.hbr = false //
87 // ========================================================================
89 if !w.hbs && !w.hbr {
90 return nil // none required
93 // ========================================================================
95 c.hbd = w // OK, we are doing some kind of heartbeating
96 ct := time.Now().UnixNano() // Prime current time
98 if w.hbs { // Finish sender parameters if required
99 sm := max(w.cx, w.sy) // ticker interval, ms
100 w.sti = 1000000 * sm // ticker interval, ns
101 w.ssd = make(chan struct{}) // add shutdown channel
102 w.ls = ct // Best guess at start
103 // fmt.Println("start send ticker")
104 go c.sendTicker()
107 if w.hbr { // Finish receiver parameters if required
108 rm := max(w.sx, w.cy) // ticker interval, ms
109 w.rti = 1000000 * rm // ticker interval, ns
110 w.rsd = make(chan struct{}) // add shutdown channel
111 w.lr = ct // Best guess at start
112 // fmt.Println("start receive ticker")
113 go c.receiveTicker()
115 return nil
119 The heart beat send ticker.
121 func (c *Connection) sendTicker() {
122 c.hbd.sc = 0
123 ticker := time.NewTicker(time.Duration(c.hbd.sti))
124 hbSend:
125 for {
126 select {
127 case <-ticker.C:
128 c.log("HeartBeat Send data")
129 // Send a heartbeat
130 f := Frame{"\n", Headers{}, NULLBUFF} // Heartbeat frame
131 r := make(chan error)
132 c.output <- wiredata{f, r}
133 e := <-r
135 c.hbd.sdl.Lock()
136 if e != nil {
137 fmt.Printf("Heartbeat Send Failure: %v\n", e)
138 c.Hbsf = true
139 } else {
140 c.Hbsf = false
141 c.hbd.sc++
143 c.hbd.sdl.Unlock()
145 case _ = <-c.hbd.ssd:
146 break hbSend
147 case _ = <-c.ssdc:
148 break hbSend
149 } // End of select
150 } // End of for
151 c.log("Heartbeat Send Ends", time.Now())
152 return
156 The heart beat receive ticker.
158 func (c *Connection) receiveTicker() {
159 c.hbd.rc = 0
160 var first, last int64
161 hbGet:
162 for {
163 ticker := time.NewTicker(time.Duration(c.hbd.rti - (last - first)))
164 select {
165 case ct := <-ticker.C:
166 first = time.Now().UnixNano()
167 ticker.Stop()
168 c.hbd.rdl.Lock()
169 flr := c.hbd.lr
170 ld := ct.UnixNano() - flr
171 c.log("HeartBeat Receive TIC", "TickerVal", ct.UnixNano(),
172 "LastReceive", flr, "Diff", ld)
173 if ld > (c.hbd.rti + (c.hbd.rti / 5)) { // swag plus to be tolerant
174 c.log("HeartBeat Receive Read is dirty")
175 c.Hbrf = true // Flag possible dirty connection
176 } else {
177 c.Hbrf = false // Reset
178 c.hbd.rc++
180 c.hbd.rdl.Unlock()
181 last = time.Now().UnixNano()
182 case _ = <-c.hbd.rsd:
183 break hbGet
184 case _ = <-c.ssdc:
185 break hbGet
186 } // End of select
187 } // End of for
188 c.log("Heartbeat Receive Ends", time.Now())
189 return