2 // Copyright © 2016-2018 Guy M. Alluard
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 Package sngecomm provides common functionality used in the stompngo_examples
32 "github.com/gmallard/stompngo"
33 "github.com/gmallard/stompngo/senv"
37 llu
= log
.New(os
.Stdout
, "UTIL ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)
41 // Provide connect headers
42 func ConnectHeaders() stompngo
.Headers
{
43 h
:= stompngo
.Headers
{}
50 h
= h
.Add("passcode", pc
)
54 if p
!= stompngo
.SPL_10
{ // 1.1 and 1.2
55 h
= h
.Add("accept-version", p
).Add("host", senv
.Vhost())
56 hb
:= senv
.Heartbeats()
58 h
= h
.Add("heart-beat", hb
)
66 // Show connection metrics.
67 func ShowStats(exampid
, tag
string, conn
*stompngo
.Connection
) {
68 r
:= conn
.FramesRead()
69 br
:= conn
.BytesRead()
70 w
:= conn
.FramesWritten()
71 bw
:= conn
.BytesWritten()
72 s
:= conn
.Running().Seconds()
73 n
:= conn
.Running().Nanoseconds()
74 llu
.Printf("%stag:%s frame_read_count:%v\n", exampid
, tag
, r
)
75 llu
.Printf("%stag:%s bytes_read:%v\n", exampid
, tag
, br
)
76 llu
.Printf("%stag:%s frame_write_count:%v\n", exampid
, tag
, w
)
77 llu
.Printf("%stag:%s bytes_written:%v\n", exampid
, tag
, bw
)
78 llu
.Printf("%stag:%s current_duration(ns):%v\n", exampid
, tag
, n
)
80 llu
.Printf("%stag:%s current_duration(sec):%20.6f\n", exampid
, tag
, s
)
81 llu
.Printf("%stag:%s frame_reads/sec:%20.6f\n", exampid
, tag
, float64(r
)/s
)
82 llu
.Printf("%stag:%s bytes_read/sec:%20.6f\n", exampid
, tag
, float64(br
)/s
)
83 llu
.Printf("%stag:%s frame_writes/sec:%20.6f\n", exampid
, tag
, float64(w
)/s
)
84 llu
.Printf("%stag:%s bytes_written/sec:%20.6f\n", exampid
, tag
, float64(bw
)/s
)
87 // Get a value between min amd max
88 func ValueBetween(min
, max
int64, fact
float64) int64 {
89 rt
, _
:= rand
.Int(rand
.Reader
, big
.NewInt(max
-min
)) // Ignore errors here
90 return int64(fact
* float64(min
+rt
.Int64()))
93 // Dump a TLS Configuration Struct
94 func DumpTLSConfig(exampid
string, c
*tls
.Config
, n
*tls
.Conn
) {
95 llu
.Printf("%s TLSConfig:\n", exampid
)
96 llu
.Printf("%s Rand:%#v\n", exampid
, c
.Rand
)
98 llu
.Printf("%s Time:%v\n", exampid
, c
.Time())
100 llu
.Printf("%s Time:%v\n", exampid
, nil)
102 llu
.Printf("%s Certificates:%#v\n", exampid
, c
.Certificates
)
103 llu
.Printf("%s NameToCertificate:%#v\n", exampid
, c
.NameToCertificate
)
104 llu
.Printf("%s RootCAs:%#v\n", exampid
, c
.RootCAs
)
105 llu
.Printf("%s NextProtos:%v\n", exampid
, c
.NextProtos
)
106 llu
.Printf("%s ServerName:%v\n", exampid
, c
.ServerName
)
107 llu
.Printf("%s ClientAuth:%v\n", exampid
, c
.ClientAuth
)
108 llu
.Printf("%s ClientCAs:%v#\n", exampid
, c
.ClientCAs
)
109 llu
.Printf("%s CipherSuites:%#v\n", exampid
, c
.CipherSuites
)
110 llu
.Printf("%s PreferServerCipherSuites:%v\n", exampid
, c
.PreferServerCipherSuites
)
111 llu
.Printf("%s SessionTicketsDisabled:%v\n", exampid
, c
.SessionTicketsDisabled
)
112 llu
.Printf("%s SessionTicketKey:%#v\n", exampid
, c
.SessionTicketKey
)
114 // Idea Embelluished From:
115 // https://groups.google.com/forum/#!topic/golang-nuts/TMNdOxugbTY
116 cs
:= n
.ConnectionState()
117 llu
.Printf("%s HandshakeComplete:%v\n", exampid
, cs
.HandshakeComplete
)
118 llu
.Printf("%s DidResume:%v\n", exampid
, cs
.DidResume
)
119 llu
.Printf("%s CipherSuite:%d(0x%X)\n", exampid
, cs
.CipherSuite
, cs
.CipherSuite
)
120 llu
.Printf("%s NegotiatedProtocol:%v\n", exampid
, cs
.NegotiatedProtocol
)
121 llu
.Printf("%s NegotiatedProtocolIsMutual:%v\n", exampid
, cs
.NegotiatedProtocolIsMutual
)
122 // llu.Printf("%s ServerName:%v\n", exampid, cs.ServerName) // Server side only
123 // Portions of any Peer Certificates present
124 certs
:= cs
.PeerCertificates
125 if certs
== nil ||
len(certs
) < 1 {
126 llu
.Printf("Could not get server's certificate from the TLS connection.\n")
130 llu
.Printf("%s There is %d Server Cert:\n", exampid
, len(certs
))
132 llu
.Printf("%s There are %d Server Certs:\n", exampid
, len(certs
))
135 for i
, cert
:= range certs
{
136 llu
.Printf("%s Certificate chain:%d\n", exampid
, i
)
137 llu
.Printf("%s Common Name:%s\n", exampid
, cert
.Subject
.CommonName
)
139 llu
.Printf("%s Subject Alternative Names (DNSNames):\n", exampid
)
140 for idx
, dnsn
:= range cert
.DNSNames
{
141 llu
.Printf("%s \tNumber:%d, DNS Name:%s\n", exampid
, idx
+1, dnsn
)
144 llu
.Printf("%s Subject Alternative Names (Emailaddresses):\n", exampid
)
145 for idx
, enn
:= range cert
.EmailAddresses
{
146 llu
.Printf("%s \tNumber:%d, DNS Name:%s\n", exampid
, idx
+1, enn
)
149 llu
.Printf("%s Subject Alternative Names (IPAddresses):\n", exampid
)
150 for idx
, ipadn
:= range cert
.IPAddresses
{
151 llu
.Printf("%s \tNumber:%d, DNS Name:%v\n", exampid
, idx
+1, ipadn
)
154 llu
.Printf("%s Valid Not Before:%s\n", exampid
, cert
.NotBefore
.Local().String())
155 llu
.Printf("%s Valid Not After:%s\n", exampid
, cert
.NotAfter
.Local().String())
156 llu
.Println(strings
.Repeat("=", 80))
161 // Handle a subscribe for the different protocol levels.
162 func HandleSubscribe(c
*stompngo
.Connection
, d
, i
, a
string) <-chan stompngo
.MessageData
{
163 h
:= stompngo
.Headers
{"destination", d
, "ack", a
}
165 switch c
.Protocol() {
166 case stompngo
.SPL_12
:
167 // Add required id header
169 case stompngo
.SPL_11
:
170 // Add required id header
172 case stompngo
.SPL_10
:
173 // Nothing else to do here
175 llu
.Fatalf("v1:%v v2:%v\n", "subscribe invalid protocol level, should not happen",
179 r
, e
:= c
.Subscribe(h
)
181 llu
.Fatalf("v1:%v v2:%v\n", "subscribe failed", e
)
186 // Handle a unsubscribe for the different protocol levels.
187 func HandleUnsubscribe(c
*stompngo
.Connection
, d
, i
string) {
188 sbh
:= stompngo
.Headers
{}
190 switch c
.Protocol() {
191 case stompngo
.SPL_12
:
192 sbh
= sbh
.Add("id", i
)
193 case stompngo
.SPL_11
:
194 sbh
= sbh
.Add("id", i
)
195 case stompngo
.SPL_10
:
196 sbh
= sbh
.Add("destination", d
)
198 llu
.Fatalf("v1:%v v2:%v\n", "unsubscribe invalid protocol level, should not happen",
201 e
:= c
.Unsubscribe(sbh
)
203 llu
.Fatalf("v1:%v v2:%v d:%v\n", "unsubscribe failed", e
, d
)
208 // Handle ACKs for the different protocol levels.
209 func HandleAck(c
*stompngo
.Connection
, h stompngo
.Headers
, id
string) {
210 ah
:= stompngo
.Headers
{}
212 switch c
.Protocol() {
213 case stompngo
.SPL_12
:
214 ah
= ah
.Add("id", h
.Value("ack"))
215 case stompngo
.SPL_11
:
216 ah
= ah
.Add("message-id", h
.Value("message-id")).Add("subscription", id
)
217 case stompngo
.SPL_10
:
218 ah
= ah
.Add("message-id", h
.Value("message-id"))
220 llu
.Fatalf("v1:%v v2:%v\n", "ack invalid protocol level, should not happen",
223 if cv
, ok
:= h
.Contains(stompngo
.HK_RECEIPT
); ok
{
224 ah
= ah
.Add(stompngo
.HK_RECEIPT
, cv
)
228 llu
.Fatalf("v1:%v v2:%v v3:%v\n", "ack failed", e
, c
.Protocol())
233 func ShowRunParms(exampid
string) {
234 llu
.Printf("%sHOST:%v\n", exampid
, os
.Getenv("STOMP_HOST"))
235 llu
.Printf("%sPORT:%v\n", exampid
, os
.Getenv("STOMP_PORT"))
236 llu
.Printf("%sPROTOCOL:%v\n", exampid
, senv
.Protocol())
237 llu
.Printf("%sVHOST:%v\n", exampid
, senv
.Vhost())
238 llu
.Printf("%sNQS:%v\n", exampid
, Nqs())
239 llu
.Printf("%sNMSGS:%v\n", exampid
, senv
.Nmsgs())
240 llu
.Printf("%sSUBCHANCAP:%v\n", exampid
, senv
.SubChanCap())
241 llu
.Printf("%sRECVFACT:%v\n", exampid
, RecvFactor())
242 llu
.Printf("%sSENDFACT:%v\n", exampid
, SendFactor())
243 llu
.Printf("%sRECVWAIT:%t\n", exampid
, RecvWait())
244 llu
.Printf("%sSENDWAIT:%t\n", exampid
, SendWait())
245 llu
.Printf("%sACKMODE:%v\n", exampid
, AckMode())
248 // Return broker identity
249 func ServerIdent(c
*stompngo
.Connection
) string {
250 cdh
:= c
.ConnectResponse
251 sr
, ok
:= cdh
.Headers
.Contains("server")
258 // Common example connect logic
259 func CommonConnect(exampid
, tag
string, l
*log
.Logger
) (net
.Conn
,
260 *stompngo
.Connection
,
263 l
.Printf("%stag:%s consess:%v common_connect_starts\n",
266 // Set up the connection.
267 h
, p
:= senv
.HostAndPort()
268 hap
:= net
.JoinHostPort(h
, p
)
269 n
, e
:= net
.Dial("tcp", hap
)
274 l
.Printf("%stag:%s connsess:%s common_connect_host_and_port:%v\n",
278 // Create connect headers and connect to stompngo
279 ch
:= ConnectHeaders()
280 l
.Printf("%stag:%s connsess:%s common_connect_headers headers:%v\n",
283 conn
, e
:= stompngo
.Connect(n
, ch
)
287 SetLogger(conn
) // Maybe set a connection logger
288 l
.Printf("%stag:%s connsess:%s common_connect_complete host:%s port:%s vhost:%s protocol:%s server:%s\n",
289 exampid
, tag
, conn
.Session(),
290 h
, p
, senv
.Vhost(), conn
.Protocol(), ServerIdent(conn
))
292 // Show connect response
293 l
.Printf("%stag:%s connsess:%s common_connect_response connresp:%v\n",
294 exampid
, tag
, conn
.Session(),
295 conn
.ConnectResponse
)
298 l
.Printf("%stag:%s connsess:%s common_connect_heart_beat_send hbsend:%d\n",
299 exampid
, tag
, conn
.Session(),
300 conn
.SendTickerInterval())
301 l
.Printf("%stag:%s connsess:%s common_connect_heart_beat_recv hbrecv:%d\n",
302 exampid
, tag
, conn
.Session(),
303 conn
.ReceiveTickerInterval())
305 l
.Printf("%stag:%s connsess:%s common_connect_local_addr:%s\n",
306 exampid
, tag
, conn
.Session(),
307 n
.LocalAddr().String())
308 l
.Printf("%stag:%s connsess:%s common_connect_remote_addr:%s\n",
309 exampid
, tag
, conn
.Session(),
310 n
.RemoteAddr().String())
316 // Common example disconnect logic
317 func CommonDisconnect(n net
.Conn
, conn
*stompngo
.Connection
,
319 l
*log
.Logger
) error
{
321 // Disconnect from the Stomp server
322 e
:= conn
.Disconnect(stompngo
.Headers
{})
326 l
.Printf("%stag:%s consess:%v common_disconnect_complete local_addr:%s remote_addr:%s\n",
327 exampid
, tag
, conn
.Session(),
328 n
.LocalAddr().String(), n
.RemoteAddr().String())
330 // Close the network connection
337 l
.Printf("%stag:%s consess:%v common_disconnect_network_close_complete\n",
338 exampid
, tag
, conn
.Session())
339 l
.Printf("%stag:%s consess:%v common_disconnect_ends\n",
340 exampid
, tag
, conn
.Session())
346 // Common example TLS connect logic
347 func CommonTLSConnect(exampid
, tag
string, l
*log
.Logger
,
348 c
*tls
.Config
) (net
.Conn
, *stompngo
.Connection
, error
) {
350 l
.Printf("%stag:%s consess:%s common_tls_connect_starts\n",
353 // Set up the connection.
354 h
, p
:= senv
.HostAndPort()
355 hap
:= net
.JoinHostPort(h
, p
)
356 n
, e
:= net
.Dial("tcp", hap
)
361 c
.ServerName
= h
// SNI
363 nc
:= tls
.Client(n
, c
) // Returns: *tls.Conn : implements net.Conn
366 if e
.Error() == "EOF" {
367 l
.Printf("%stag:%s consess:%s common_tls_handshake_EOF_Is_the_broker_port_TLS_enabled? port:%s\n",
371 l
.Fatalf("%stag:%s consess:%s common_tls_handshake_failed error:%v\n",
375 l
.Printf("%stag:%s consess:%s common_tls_handshake_complete\n",
378 l
.Printf("%stag:%s connsess:%s common_tls_connect_host_and_port:%v\n",
382 // Create connect headers and connect to stompngo
383 ch
:= ConnectHeaders()
384 l
.Printf("%stag:%s connsess:%s common_tls_connect_headers headers:%v\n",
387 conn
, e
:= stompngo
.Connect(nc
, ch
)
392 l
.Printf("%stag:%s connsess:%s common_tls_connect_complete host:%s vhost:%s protocol:%s server:%s\n",
393 exampid
, tag
, conn
.Session(),
394 h
, senv
.Vhost(), conn
.Protocol(), ServerIdent(conn
))
396 // Show connect response
397 l
.Printf("%stag:%s connsess:%s common_tls_connect_response connresp:%v\n",
398 exampid
, tag
, conn
.Session(),
399 conn
.ConnectResponse
)
401 // Show heartbeat data (if heart beats are being used)
402 if senv
.Heartbeats() != "" {
403 l
.Printf("%stag:%s connsess:%s common_tls_connect_heart_beat_send hbsend:%v\n",
404 exampid
, tag
, conn
.Session(),
405 conn
.SendTickerInterval())
406 l
.Printf("%stag:%s connsess:%s common_tls_connect_heart_beat_recv hbrecv:%v\n",
407 exampid
, tag
, conn
.Session(),
408 conn
.ReceiveTickerInterval())
411 l
.Printf("%stag:%s connsess:%s common_tls_connect_local_addr:%s\n",
412 exampid
, tag
, conn
.Session(),
413 n
.LocalAddr().String())
414 l
.Printf("%stag:%s connsess:%s common_tls_connect_remote_addr:%s\n",
415 exampid
, tag
, conn
.Session(),
416 n
.RemoteAddr().String())
422 // Example destination
425 if os
.Getenv("STOMP_ARTEMIS") == "" {
429 if strings
.Index(d
, "topic") >= 0 {
432 return pref
+ strings
.Replace(d
, "/", ".", -1)
436 func SetLogger(conn
*stompngo
.Connection
) {
438 ul
:= log
.New(os
.Stdout
, Logger()+" ", log
.Ldate|log
.Lmicroseconds|log
.Lshortfile
)