2 // Copyright © 2011-2017 Guy M. Allard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
26 Connection handler, one time use during initial connect.
28 Handle broker response, react to version incompatabilities, set up session,
29 and if necessary initialize heart beats.
31 func (c
*Connection
) connectHandler(h Headers
) (e error
) {
32 //fmt.Printf("CHDB01\n")
33 c
.rdr
= bufio
.NewReader(c
.netconn
)
34 b
, e
:= c
.rdr
.ReadBytes(0)
38 //fmt.Printf("CHDB02\n")
39 f
, e
:= connectResponse(string(b
))
43 //fmt.Printf("CHDB03\n")
45 c
.ConnectResponse
= &Message
{f
.Command
, f
.Headers
, f
.Body
}
46 if c
.ConnectResponse
.Command
== ERROR
{
49 //fmt.Printf("CHDB04\n")
51 e
= c
.setProtocolLevel(h
, c
.ConnectResponse
.Headers
)
55 //fmt.Printf("CHDB05\n")
57 if s
, ok
:= c
.ConnectResponse
.Headers
.Contains(HK_SESSION
); ok
{
61 if c
.Protocol() >= SPL_11
{
62 e
= c
.initializeHeartBeats(h
)
67 //fmt.Printf("CHDB06\n")
71 c
.mets
.tbr
+= c
.ConnectResponse
.Size(false)
76 Handle data from the wire after CONNECT is sent. Attempt to create a Frame
79 Called one time per connection at connection start.
81 func connectResponse(s
string) (*Frame
, error
) {
85 f
.Body
= make([]uint8, 0)
88 c
:= strings
.SplitN(s
, "\n", 2)
93 if f
.Command
!= CONNECTED
&& f
.Command
!= ERROR
{
98 case "\x00", "\n": // No headers, malformed bodies
99 f
.Body
= []uint8(c
[1])
101 case "\n\x00": // No headers, no body is OK
103 default: // Otherwise continue
106 b
:= strings
.SplitN(c
[1], "\n\n", 2)
107 if len(b
) == 1 { // No Headers, b[0] == body
109 f
.Body
= w
[0 : len(w
)-1]
110 if f
.Command
== CONNECTED
&& len(f
.Body
) > 0 {
117 // b[0] - the headers
121 for _
, l
:= range strings
.Split(b
[0], "\n") {
122 p
:= strings
.SplitN(l
, ":", 2)
124 f
.Body
= []uint8(p
[0]) // Bad feedback
127 f
.Headers
= append(f
.Headers
, p
[0], p
[1])
131 f
.Body
= w
[0 : len(w
)-1]
132 if f
.Command
== CONNECTED
&& len(f
.Body
) > 0 {
140 Check client version, one time use during initial connect.
142 func (c
*Connection
) checkClientVersions(h Headers
) (e error
) {
143 w
:= h
.Value(HK_ACCEPT_VERSION
)
144 if w
== "" { // Not present, client wants 1.0
147 v
:= strings
.SplitN(w
, ",", -1) //
149 for _
, sv
:= range v
{
150 if hasValue(supported
, sv
) {
151 ok
= true // At least we support one the client wants
157 if _
, ok
= h
.Contains(HK_HOST
); !ok
{
164 Set the protocol level for this new connection.
166 func (c
*Connection
) setProtocolLevel(ch
, sh Headers
) (e error
) {
167 chw
:= ch
.Value(HK_ACCEPT_VERSION
)
168 shr
:= sh
.Value(HK_VERSION
)
170 if chw
== shr
&& Supported(shr
) {
174 if chw
== "" && shr
== "" { // Straight up 1.0
175 return nil // protocol level defaults to SPL_10
177 cv
:= strings
.SplitN(chw
, ",", -1) // Client requested versions
179 if chw
!= "" && shr
!= "" {
180 if hasValue(cv
, shr
) {
182 return EBADVERSVR
// Client and server agree, but we do not support it
190 if chw
!= "" && shr
== "" { // Client asked for something, server is pure 1.0
191 if hasValue(cv
, SPL_10
) {
192 return nil // protocol level defaults to SPL_10
196 c
.protocol
= shr
// Could be anything we support
201 Internal function, used only during CONNECT processing.
203 func hasValue(a
[]string, w
string) bool {
204 for _
, v
:= range a
{