Version bump.
[stompngo.git] / reader.go
blobdcb6f7ce89d900455c0deb350b840802071d16e5
1 //
2 // Copyright © 2011-2019 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "fmt"
21 "io"
22 "net"
23 "strconv"
24 "strings"
25 "time"
29 Logical network reader.
31 Read STOMP frames from the connection, create MessageData
32 structures from the received data, and push the MessageData to the client.
34 func (c *Connection) reader() {
35 readLoop:
36 for {
37 f, e := c.readFrame()
38 logLock.Lock()
39 if c.logger != nil {
40 c.logx("RDR_RECEIVE_FRAME", f.Command, f.Headers, HexData(f.Body),
41 "RDR_RECEIVE_ERR", e)
43 logLock.Unlock()
44 if e != nil {
45 //debug.PrintStack()
46 f.Headers = append(f.Headers, "connection_read_error", e.Error())
47 md := MessageData{Message(f), e}
48 c.handleReadError(md)
49 if e == io.EOF && !c.isConnected() {
50 c.log("RDR_SHUTDOWN_EOF", e)
51 } else {
52 c.log("RDR_CONN_GENL_ERR", e)
54 break readLoop
57 if f.Command == "" {
58 continue readLoop
61 m := Message(f)
62 c.mets.tfr += 1 // Total frames read
63 // Headers already decoded
64 c.mets.tbr += m.Size(false) // Total bytes read
66 //*************************************************************************
67 // Replacement START
68 md := MessageData{m, e}
69 switch f.Command {
71 case MESSAGE:
72 sid, ok := f.Headers.Contains(HK_SUBSCRIPTION)
73 if !ok { // This should *NEVER* happen
74 panic(fmt.Sprintf("stompngo INTERNAL ERROR: command:<%s> headers:<%v>",
75 f.Command, f.Headers))
77 c.subsLock.RLock()
78 ps, sok := c.subs[sid] // This is a map of pointers .....
80 if !sok {
81 // The sub can be gone under some timing conditions. In that case
82 // we log it of possible, and continue (hope for the best).
83 c.log("RDR_NOSUB", sid, m.Command, m.Headers)
84 goto csRUnlock
86 if ps.cs {
87 // The sub can also already be closed under some conditions.
88 // Again, we log that if possible, and continue
89 c.log("RDR_CLSUB", sid, m.Command, m.Headers)
90 goto csRUnlock
92 // Handle subscription draining
93 switch ps.drav {
94 case false:
95 ps.md <- md
96 default:
97 ps.drmc++
98 if ps.drmc > ps.dra {
99 logLock.Lock()
100 if c.logger != nil {
101 c.logx("RDR_DROPM", ps.drmc, sid, m.Command,
102 m.Headers, HexData(m.Body))
104 logLock.Unlock()
105 } else {
106 ps.md <- md
109 csRUnlock:
110 c.subsLock.RUnlock()
112 case ERROR:
113 fallthrough
115 case RECEIPT:
116 c.input <- md
118 default:
119 panic(fmt.Sprintf("Broker SEVERE ERROR, not STOMP? command:<%s> headers:<%v>",
120 f.Command, f.Headers))
122 // Replacement END
123 //*************************************************************************
125 select {
126 case _ = <-c.ssdc:
127 c.log("RDR_SHUTDOWN detected")
128 break readLoop
129 default:
131 c.log("RDR_RELOOP")
133 close(c.input)
134 c.setConnected(false)
135 c.sysAbort()
136 c.log("RDR_SHUTDOWN", time.Now())
140 Physical frame reader.
142 This parses a single STOMP frame from data off of the wire, and
143 returns a Frame, with a possible error.
145 Note: this functionality could hang or exhibit other erroneous behavior
146 if running against a non-compliant STOMP server.
148 func (c *Connection) readFrame() (f Frame, e error) {
149 f = Frame{"", Headers{}, NULLBUFF}
151 // Read f.Command or line ends (maybe heartbeats)
152 c.setReadDeadline()
153 s, e := c.rdr.ReadString('\n')
154 if c.checkReadError(e) != nil {
155 return f, e
157 if s == "" {
158 return f, e
160 if c.hbd != nil {
161 c.updateHBReads()
163 f.Command = s[0 : len(s)-1]
164 if s == "\n" {
165 return f, e
168 // Validate the command
169 if _, ok := validCmds[f.Command]; !ok {
170 ev := fmt.Errorf("%s\n%s", EINVBCMD, HexData([]byte(f.Command)))
171 return f, ev
173 // Read f.Headers
174 for {
175 c.setReadDeadline()
176 s, e := c.rdr.ReadString('\n')
177 if c.checkReadError(e) != nil {
178 return f, e
180 if c.hbd != nil {
181 c.updateHBReads()
183 if s == "\n" {
184 break
186 s = s[0 : len(s)-1]
187 p := strings.SplitN(s, ":", 2)
188 if len(p) != 2 {
189 return f, EUNKHDR
191 // Always decode regardless of protocol level. See issue #47.
192 p[0] = decode(p[0])
193 p[1] = decode(p[1])
194 f.Headers = append(f.Headers, p[0], p[1])
197 e = checkHeaders(f.Headers, c.Protocol())
198 if e != nil {
199 return f, e
201 // Read f.Body
202 if v, ok := f.Headers.Contains(HK_CONTENT_LENGTH); ok {
203 l, e := strconv.Atoi(strings.TrimSpace(v))
204 if e != nil {
205 return f, e
207 if l == 0 {
208 f.Body, e = readUntilNul(c)
209 } else {
210 f.Body, e = readBody(c, l)
212 } else {
213 // content-length not present
214 f.Body, e = readUntilNul(c)
216 if c.checkReadError(e) != nil {
217 return f, e
219 if c.hbd != nil {
220 c.updateHBReads()
222 // End of read loop - set no deadline
223 if c.dld.rde {
224 _ = c.netconn.SetReadDeadline(c.dld.t0)
226 return f, e
229 func (c *Connection) updateHBReads() {
230 c.hbd.rdl.Lock()
231 c.hbd.lr = time.Now().UnixNano() // Latest good read
232 c.hbd.rdl.Unlock()
235 func (c *Connection) setReadDeadline() {
236 if c.dld.rde && c.dld.rds {
237 _ = c.netconn.SetReadDeadline(time.Now().Add(c.dld.rdld))
241 func (c *Connection) checkReadError(e error) error {
242 //c.log("checkReadError", e)
243 if e == nil {
244 return e
246 ne, ok := e.(net.Error)
247 if !ok {
248 return e
250 if ne.Timeout() {
251 //c.log("is a timeout")
252 if c.dld.dns {
253 c.log("invoking read deadline callback")
254 c.dld.dlnotify(e, false)
257 return e