2 // Copyright © 2011-2017 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
28 Logical network reader.
30 Read STOMP frames from the connection, create MessageData
31 structures from the received data, and push the MessageData to the client.
33 func (c
*Connection
) reader() {
40 c
.log("RDR_SHUTDOWN detected")
45 c
.log("RDR_RECEIVE_FRAME", f
.Command
, f
.Headers
, HexData(f
.Body
),
49 f
.Headers
= append(f
.Headers
, "connection_read_error", e
.Error())
50 md
:= MessageData
{Message(f
), e
}
52 c
.log("RDR_CONN_ERR", e
)
61 c
.mets
.tfr
+= 1 // Total frames read
62 // Headers already decoded
63 c
.mets
.tbr
+= m
.Size(false) // Total bytes read
65 //*************************************************************************
67 md
:= MessageData
{m
, e
}
71 sid
, ok
:= f
.Headers
.Contains(HK_SUBSCRIPTION
)
72 if !ok
{ // This should *NEVER* happen
73 panic(fmt
.Sprintf("stompngo INTERNAL ERROR: command:<%s> headers:<%v>",
74 f
.Command
, f
.Headers
))
77 ps
, sok
:= c
.subs
[sid
] // This is a map of pointers .....
80 // The sub can be gone under some timing conditions. In that case
81 // we log it of possible, and continue (hope for the best).
82 c
.log("RDR_NOSUB", sid
, m
.Command
, m
.Headers
)
86 // The sub can also already be closed under some conditions.
87 // Again, we log that if possible, and continue
88 c
.log("RDR_CLSUB", sid
, m
.Command
, m
.Headers
)
91 // Handle subscription draining
98 c
.log("RDR_DROPM", ps
.drmc
, sid
, m
.Command
,
99 m
.Headers
, HexData(m
.Body
))
114 panic(fmt
.Sprintf("Broker SEVERE ERROR, not STOMP? command:<%s> headers:<%v>",
115 f
.Command
, f
.Headers
))
118 //*************************************************************************
122 c
.log("RDR_SHUTDOWN detected")
129 c
.log("RDR_SHUTDOWN", time
.Now())
133 Physical frame reader.
135 This parses a single STOMP frame from data off of the wire, and
136 returns a Frame, with a possible error.
138 Note: this functionality could hang or exhibit other erroneous behavior
139 if running against a non-compliant STOMP server.
141 func (c
*Connection
) readFrame() (f Frame
, e error
) {
142 f
= Frame
{"", Headers
{}, NULLBUFF
}
144 // Read f.Command or line ends (maybe heartbeats)
146 s
, e
:= c
.rdr
.ReadString('\n')
147 if c
.checkReadError(e
) != nil {
156 f
.Command
= s
[0 : len(s
)-1]
161 // Validate the command
162 if _
, ok
:= validCmds
[f
.Command
]; !ok
{
163 ev
:= fmt
.Errorf("%s\n%s", EINVBCMD
, HexData([]byte(f
.Command
)))
169 s
, e
:= c
.rdr
.ReadString('\n')
170 if c
.checkReadError(e
) != nil {
180 p
:= strings
.SplitN(s
, ":", 2)
184 if c
.Protocol() != SPL_10
{
188 f
.Headers
= append(f
.Headers
, p
[0], p
[1])
191 e
= checkHeaders(f
.Headers
, c
.Protocol())
196 if v
, ok
:= f
.Headers
.Contains(HK_CONTENT_LENGTH
); ok
{
197 l
, e
:= strconv
.Atoi(strings
.TrimSpace(v
))
202 f
.Body
, e
= readUntilNul(c
)
204 f
.Body
, e
= readBody(c
, l
)
207 // content-length not present
208 f
.Body
, e
= readUntilNul(c
)
210 if c
.checkReadError(e
) != nil {
220 func (c
*Connection
) updateHBReads() {
222 c
.hbd
.lr
= time
.Now().UnixNano() // Latest good read
226 func (c
*Connection
) setReadDeadline() {
227 if c
.dld
.rde
&& c
.dld
.rds
{
228 _
= c
.netconn
.SetReadDeadline(time
.Now().Add(c
.dld
.rdld
))
232 func (c
*Connection
) checkReadError(e error
) error
{
233 //c.log("checkReadError", e)
237 ne
, ok
:= e
.(net
.Error
)
242 //c.log("is a timeout")
244 c
.log("invoking read deadline callback")
245 c
.dld
.dlnotify(e
, false)