Rework writer deadline handling.
[stompngo.git] / reader.go
blobd27ff79bb8d92a001e977a939ade52c62fe56aa8
1 //
2 // Copyright © 2011-2017 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "fmt"
21 "net"
22 "strconv"
23 "strings"
24 "time"
28 Logical network reader.
30 Read STOMP frames from the connection, create MessageData
31 structures from the received data, and push the MessageData to the client.
33 func (c *Connection) reader() {
34 readLoop:
35 for {
36 f, e := c.readFrame()
38 select {
39 case _ = <-c.ssdc:
40 c.log("RDR_SHUTDOWN detected")
41 break readLoop
42 default:
45 c.log("RDR_RECEIVE_FRAME", f.Command, f.Headers, HexData(f.Body),
46 "RDR_RECEIVE_ERR", e)
47 if e != nil {
48 //debug.PrintStack()
49 f.Headers = append(f.Headers, "connection_read_error", e.Error())
50 md := MessageData{Message(f), e}
51 c.handleReadError(md)
52 c.log("RDR_CONN_ERR", e)
53 break readLoop
56 if f.Command == "" {
57 continue readLoop
60 m := Message(f)
61 c.mets.tfr += 1 // Total frames read
62 // Headers already decoded
63 c.mets.tbr += m.Size(false) // Total bytes read
65 //*************************************************************************
66 // Replacement START
67 md := MessageData{m, e}
68 switch f.Command {
70 case MESSAGE:
71 sid, ok := f.Headers.Contains(HK_SUBSCRIPTION)
72 if !ok { // This should *NEVER* happen
73 panic(fmt.Sprintf("stompngo INTERNAL ERROR: command:<%s> headers:<%v>",
74 f.Command, f.Headers))
76 c.subsLock.RLock()
77 ps, sok := c.subs[sid] // This is a map of pointers .....
79 if !sok {
80 // The sub can be gone under some timing conditions. In that case
81 // we log it of possible, and continue (hope for the best).
82 c.log("RDR_NOSUB", sid, m.Command, m.Headers)
83 goto csRUnlock
85 if ps.cs {
86 // The sub can also already be closed under some conditions.
87 // Again, we log that if possible, and continue
88 c.log("RDR_CLSUB", sid, m.Command, m.Headers)
89 goto csRUnlock
91 // Handle subscription draining
92 switch ps.drav {
93 case false:
94 ps.md <- md
95 default:
96 ps.drmc++
97 if ps.drmc > ps.dra {
98 c.log("RDR_DROPM", ps.drmc, sid, m.Command,
99 m.Headers, HexData(m.Body))
100 } else {
101 ps.md <- md
104 csRUnlock:
105 c.subsLock.RUnlock()
107 case ERROR:
108 fallthrough
110 case RECEIPT:
111 c.input <- md
113 default:
114 panic(fmt.Sprintf("Broker SEVERE ERROR, not STOMP? command:<%s> headers:<%v>",
115 f.Command, f.Headers))
117 // Replacement END
118 //*************************************************************************
120 select {
121 case _ = <-c.ssdc:
122 c.log("RDR_SHUTDOWN detected")
123 break readLoop
124 default:
126 c.log("RDR_RELOOP")
128 close(c.input)
129 c.log("RDR_SHUTDOWN", time.Now())
133 Physical frame reader.
135 This parses a single STOMP frame from data off of the wire, and
136 returns a Frame, with a possible error.
138 Note: this functionality could hang or exhibit other erroneous behavior
139 if running against a non-compliant STOMP server.
141 func (c *Connection) readFrame() (f Frame, e error) {
142 f = Frame{"", Headers{}, NULLBUFF}
144 // Read f.Command or line ends (maybe heartbeats)
145 c.setReadDeadline()
146 s, e := c.rdr.ReadString('\n')
147 if c.checkReadError(e) != nil {
148 return f, e
150 if s == "" {
151 return f, e
153 if c.hbd != nil {
154 c.updateHBReads()
156 f.Command = s[0 : len(s)-1]
157 if s == "\n" {
158 return f, e
161 // Validate the command
162 if _, ok := validCmds[f.Command]; !ok {
163 ev := fmt.Errorf("%s\n%s", EINVBCMD, HexData([]byte(f.Command)))
164 return f, ev
166 // Read f.Headers
167 for {
168 c.setReadDeadline()
169 s, e := c.rdr.ReadString('\n')
170 if c.checkReadError(e) != nil {
171 return f, e
173 if c.hbd != nil {
174 c.updateHBReads()
176 if s == "\n" {
177 break
179 s = s[0 : len(s)-1]
180 p := strings.SplitN(s, ":", 2)
181 if len(p) != 2 {
182 return f, EUNKHDR
184 if c.Protocol() != SPL_10 {
185 p[0] = decode(p[0])
186 p[1] = decode(p[1])
188 f.Headers = append(f.Headers, p[0], p[1])
191 e = checkHeaders(f.Headers, c.Protocol())
192 if e != nil {
193 return f, e
195 // Read f.Body
196 if v, ok := f.Headers.Contains(HK_CONTENT_LENGTH); ok {
197 l, e := strconv.Atoi(strings.TrimSpace(v))
198 if e != nil {
199 return f, e
201 if l == 0 {
202 f.Body, e = readUntilNul(c)
203 } else {
204 f.Body, e = readBody(c, l)
206 } else {
207 // content-length not present
208 f.Body, e = readUntilNul(c)
210 if c.checkReadError(e) != nil {
211 return f, e
213 if c.hbd != nil {
214 c.updateHBReads()
217 return f, e
220 func (c *Connection) updateHBReads() {
221 c.hbd.rdl.Lock()
222 c.hbd.lr = time.Now().UnixNano() // Latest good read
223 c.hbd.rdl.Unlock()
226 func (c *Connection) setReadDeadline() {
227 if c.dld.rde && c.dld.rds {
228 _ = c.netconn.SetReadDeadline(time.Now().Add(c.dld.rdld))
232 func (c *Connection) checkReadError(e error) error {
233 //c.log("checkReadError", e)
234 if e == nil {
235 return e
237 ne, ok := e.(net.Error)
238 if !ok {
239 return e
241 if ne.Timeout() {
242 //c.log("is a timeout")
243 if c.dld.dns {
244 c.log("invoking read deadline callback")
245 c.dld.dlnotify(e, false)
248 return e