2 // Copyright © 2011-2019 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
29 Logical network reader.
31 Read STOMP frames from the connection, create MessageData
32 structures from the received data, and push the MessageData to the client.
34 func (c
*Connection
) reader() {
40 c
.logx("RDR_RECEIVE_FRAME", f
.Command
, f
.Headers
, HexData(f
.Body
),
46 f
.Headers
= append(f
.Headers
, "connection_read_error", e
.Error())
47 md
:= MessageData
{Message(f
), e
}
49 if e
== io
.EOF
&& !c
.isConnected() {
50 c
.log("RDR_SHUTDOWN_EOF", e
)
52 c
.log("RDR_CONN_GENL_ERR", e
)
62 c
.mets
.tfr
+= 1 // Total frames read
63 // Headers already decoded
64 c
.mets
.tbr
+= m
.Size(false) // Total bytes read
66 //*************************************************************************
68 md
:= MessageData
{m
, e
}
72 sid
, ok
:= f
.Headers
.Contains(HK_SUBSCRIPTION
)
73 if !ok
{ // This should *NEVER* happen
74 panic(fmt
.Sprintf("stompngo INTERNAL ERROR: command:<%s> headers:<%v>",
75 f
.Command
, f
.Headers
))
78 ps
, sok
:= c
.subs
[sid
] // This is a map of pointers .....
81 // The sub can be gone under some timing conditions. In that case
82 // we log it of possible, and continue (hope for the best).
83 c
.log("RDR_NOSUB", sid
, m
.Command
, m
.Headers
)
87 // The sub can also already be closed under some conditions.
88 // Again, we log that if possible, and continue
89 c
.log("RDR_CLSUB", sid
, m
.Command
, m
.Headers
)
92 // Handle subscription draining
101 c
.logx("RDR_DROPM", ps
.drmc
, sid
, m
.Command
,
102 m
.Headers
, HexData(m
.Body
))
119 panic(fmt
.Sprintf("Broker SEVERE ERROR, not STOMP? command:<%s> headers:<%v>",
120 f
.Command
, f
.Headers
))
123 //*************************************************************************
127 c
.log("RDR_SHUTDOWN detected")
134 c
.setConnected(false)
136 c
.log("RDR_SHUTDOWN", time
.Now())
140 Physical frame reader.
142 This parses a single STOMP frame from data off of the wire, and
143 returns a Frame, with a possible error.
145 Note: this functionality could hang or exhibit other erroneous behavior
146 if running against a non-compliant STOMP server.
148 func (c
*Connection
) readFrame() (f Frame
, e error
) {
149 f
= Frame
{"", Headers
{}, NULLBUFF
}
151 // Read f.Command or line ends (maybe heartbeats)
153 s
, e
:= c
.rdr
.ReadString('\n')
154 if c
.checkReadError(e
) != nil {
163 f
.Command
= s
[0 : len(s
)-1]
168 // Validate the command
169 if _
, ok
:= validCmds
[f
.Command
]; !ok
{
170 ev
:= fmt
.Errorf("%s\n%s", EINVBCMD
, HexData([]byte(f
.Command
)))
176 s
, e
:= c
.rdr
.ReadString('\n')
177 if c
.checkReadError(e
) != nil {
187 p
:= strings
.SplitN(s
, ":", 2)
191 // Always decode regardless of protocol level. See issue #47.
194 f
.Headers
= append(f
.Headers
, p
[0], p
[1])
197 e
= checkHeaders(f
.Headers
, c
.Protocol())
202 if v
, ok
:= f
.Headers
.Contains(HK_CONTENT_LENGTH
); ok
{
203 l
, e
:= strconv
.Atoi(strings
.TrimSpace(v
))
208 f
.Body
, e
= readUntilNul(c
)
210 f
.Body
, e
= readBody(c
, l
)
213 // content-length not present
214 f
.Body
, e
= readUntilNul(c
)
216 if c
.checkReadError(e
) != nil {
222 // End of read loop - set no deadline
224 _
= c
.netconn
.SetReadDeadline(c
.dld
.t0
)
229 func (c
*Connection
) updateHBReads() {
231 c
.hbd
.lr
= time
.Now().UnixNano() // Latest good read
235 func (c
*Connection
) setReadDeadline() {
236 if c
.dld
.rde
&& c
.dld
.rds
{
237 _
= c
.netconn
.SetReadDeadline(time
.Now().Add(c
.dld
.rdld
))
241 func (c
*Connection
) checkReadError(e error
) error
{
242 //c.log("checkReadError", e)
246 ne
, ok
:= e
.(net
.Error
)
251 //c.log("is a timeout")
253 c
.log("invoking read deadline callback")
254 c
.dld
.dlnotify(e
, false)