Version bump.
[stompngo_examples.git] / sngecomm / utilities.go
blobbf6a45cf293f773185f1b6c4c188d00c82cb0225
1 //
2 // Copyright © 2016-2018 Guy M. Alluard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 Package sngecomm provides common functionality used in the stompngo_examples
19 project.
21 package sngecomm
23 import (
24 "crypto/rand"
25 "crypto/tls"
26 "log"
27 "math/big"
28 "net"
29 "os"
30 "strings"
32 "github.com/gmallard/stompngo"
33 "github.com/gmallard/stompngo/senv"
36 var (
37 llu = log.New(os.Stdout, "UTIL ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
38 Lcs = "NotAvailable"
41 // Provide connect headers
42 func ConnectHeaders() stompngo.Headers {
43 h := stompngo.Headers{}
44 l := senv.Login()
45 if l != "" {
46 h = h.Add("login", l)
48 pc := senv.Passcode()
49 if pc != "" {
50 h = h.Add("passcode", pc)
53 p := senv.Protocol()
54 if p != stompngo.SPL_10 { // 1.1 and 1.2
55 h = h.Add("accept-version", p).Add("host", senv.Vhost())
56 hb := senv.Heartbeats()
57 if hb != "" {
58 h = h.Add("heart-beat", hb)
63 return h
66 // Show connection metrics.
67 func ShowStats(exampid, tag string, conn *stompngo.Connection) {
68 r := conn.FramesRead()
69 br := conn.BytesRead()
70 w := conn.FramesWritten()
71 bw := conn.BytesWritten()
72 s := conn.Running().Seconds()
73 n := conn.Running().Nanoseconds()
74 llu.Printf("%stag:%s frame_read_count:%v\n", exampid, tag, r)
75 llu.Printf("%stag:%s bytes_read:%v\n", exampid, tag, br)
76 llu.Printf("%stag:%s frame_write_count:%v\n", exampid, tag, w)
77 llu.Printf("%stag:%s bytes_written:%v\n", exampid, tag, bw)
78 llu.Printf("%stag:%s current_duration(ns):%v\n", exampid, tag, n)
80 llu.Printf("%stag:%s current_duration(sec):%20.6f\n", exampid, tag, s)
81 llu.Printf("%stag:%s frame_reads/sec:%20.6f\n", exampid, tag, float64(r)/s)
82 llu.Printf("%stag:%s bytes_read/sec:%20.6f\n", exampid, tag, float64(br)/s)
83 llu.Printf("%stag:%s frame_writes/sec:%20.6f\n", exampid, tag, float64(w)/s)
84 llu.Printf("%stag:%s bytes_written/sec:%20.6f\n", exampid, tag, float64(bw)/s)
87 // Get a value between min amd max
88 func ValueBetween(min, max int64, fact float64) int64 {
89 rt, _ := rand.Int(rand.Reader, big.NewInt(max-min)) // Ignore errors here
90 return int64(fact * float64(min+rt.Int64()))
93 // Dump a TLS Configuration Struct
94 func DumpTLSConfig(exampid string, c *tls.Config, n *tls.Conn) {
95 llu.Printf("%s TLSConfig:\n", exampid)
96 llu.Printf("%s Rand:%#v\n", exampid, c.Rand)
97 if c.Time != nil {
98 llu.Printf("%s Time:%v\n", exampid, c.Time())
99 } else {
100 llu.Printf("%s Time:%v\n", exampid, nil)
102 llu.Printf("%s Certificates:%#v\n", exampid, c.Certificates)
103 llu.Printf("%s NameToCertificate:%#v\n", exampid, c.NameToCertificate)
104 llu.Printf("%s RootCAs:%#v\n", exampid, c.RootCAs)
105 llu.Printf("%s NextProtos:%v\n", exampid, c.NextProtos)
106 llu.Printf("%s ServerName:%v\n", exampid, c.ServerName)
107 llu.Printf("%s ClientAuth:%v\n", exampid, c.ClientAuth)
108 llu.Printf("%s ClientCAs:%v#\n", exampid, c.ClientCAs)
109 llu.Printf("%s CipherSuites:%#v\n", exampid, c.CipherSuites)
110 llu.Printf("%s PreferServerCipherSuites:%v\n", exampid, c.PreferServerCipherSuites)
111 llu.Printf("%s SessionTicketsDisabled:%v\n", exampid, c.SessionTicketsDisabled)
112 llu.Printf("%s SessionTicketKey:%#v\n", exampid, c.SessionTicketKey)
114 // Idea Embelluished From:
115 // https://groups.google.com/forum/#!topic/golang-nuts/TMNdOxugbTY
116 cs := n.ConnectionState()
117 llu.Printf("%s HandshakeComplete:%v\n", exampid, cs.HandshakeComplete)
118 llu.Printf("%s DidResume:%v\n", exampid, cs.DidResume)
119 llu.Printf("%s CipherSuite:%d(0x%X)\n", exampid, cs.CipherSuite, cs.CipherSuite)
120 llu.Printf("%s NegotiatedProtocol:%v\n", exampid, cs.NegotiatedProtocol)
121 llu.Printf("%s NegotiatedProtocolIsMutual:%v\n", exampid, cs.NegotiatedProtocolIsMutual)
122 // llu.Printf("%s ServerName:%v\n", exampid, cs.ServerName) // Server side only
123 // Portions of any Peer Certificates present
124 certs := cs.PeerCertificates
125 if certs == nil || len(certs) < 1 {
126 llu.Printf("Could not get server's certificate from the TLS connection.\n")
127 return
129 if len(certs) == 1 {
130 llu.Printf("%s There is %d Server Cert:\n", exampid, len(certs))
131 } else {
132 llu.Printf("%s There are %d Server Certs:\n", exampid, len(certs))
135 for i, cert := range certs {
136 llu.Printf("%s Certificate chain:%d\n", exampid, i)
137 llu.Printf("%s Common Name:%s\n", exampid, cert.Subject.CommonName)
139 llu.Printf("%s Subject Alternative Names (DNSNames):\n", exampid)
140 for idx, dnsn := range cert.DNSNames {
141 llu.Printf("%s \tNumber:%d, DNS Name:%s\n", exampid, idx+1, dnsn)
144 llu.Printf("%s Subject Alternative Names (Emailaddresses):\n", exampid)
145 for idx, enn := range cert.EmailAddresses {
146 llu.Printf("%s \tNumber:%d, DNS Name:%s\n", exampid, idx+1, enn)
149 llu.Printf("%s Subject Alternative Names (IPAddresses):\n", exampid)
150 for idx, ipadn := range cert.IPAddresses {
151 llu.Printf("%s \tNumber:%d, DNS Name:%v\n", exampid, idx+1, ipadn)
154 llu.Printf("%s Valid Not Before:%s\n", exampid, cert.NotBefore.Local().String())
155 llu.Printf("%s Valid Not After:%s\n", exampid, cert.NotAfter.Local().String())
156 llu.Println(strings.Repeat("=", 80))
161 // Handle a subscribe for the different protocol levels.
162 func HandleSubscribe(c *stompngo.Connection, d, i, a string) <-chan stompngo.MessageData {
163 h := stompngo.Headers{"destination", d, "ack", a}
165 switch c.Protocol() {
166 case stompngo.SPL_12:
167 // Add required id header
168 h = h.Add("id", i)
169 case stompngo.SPL_11:
170 // Add required id header
171 h = h.Add("id", i)
172 case stompngo.SPL_10:
173 // Nothing else to do here
174 default:
175 llu.Fatalf("v1:%v v2:%v\n", "subscribe invalid protocol level, should not happen",
176 c.Protocol())
179 r, e := c.Subscribe(h)
180 if e != nil {
181 llu.Fatalf("v1:%v v2:%v\n", "subscribe failed", e)
183 return r
186 // Handle a unsubscribe for the different protocol levels.
187 func HandleUnsubscribe(c *stompngo.Connection, d, i string) {
188 sbh := stompngo.Headers{}
190 switch c.Protocol() {
191 case stompngo.SPL_12:
192 sbh = sbh.Add("id", i)
193 case stompngo.SPL_11:
194 sbh = sbh.Add("id", i)
195 case stompngo.SPL_10:
196 sbh = sbh.Add("destination", d)
197 default:
198 llu.Fatalf("v1:%v v2:%v\n", "unsubscribe invalid protocol level, should not happen",
199 c.Protocol())
201 e := c.Unsubscribe(sbh)
202 if e != nil {
203 llu.Fatalf("v1:%v v2:%v d:%v\n", "unsubscribe failed", e, d)
205 return
208 // Handle ACKs for the different protocol levels.
209 func HandleAck(c *stompngo.Connection, h stompngo.Headers, id string) {
210 ah := stompngo.Headers{}
212 switch c.Protocol() {
213 case stompngo.SPL_12:
214 ah = ah.Add("id", h.Value("ack"))
215 case stompngo.SPL_11:
216 ah = ah.Add("message-id", h.Value("message-id")).Add("subscription", id)
217 case stompngo.SPL_10:
218 ah = ah.Add("message-id", h.Value("message-id"))
219 default:
220 llu.Fatalf("v1:%v v2:%v\n", "ack invalid protocol level, should not happen",
221 c.Protocol())
223 if cv, ok := h.Contains(stompngo.HK_RECEIPT); ok {
224 ah = ah.Add(stompngo.HK_RECEIPT, cv)
226 e := c.Ack(ah)
227 if e != nil {
228 llu.Fatalf("v1:%v v2:%v v3:%v\n", "ack failed", e, c.Protocol())
230 return
233 func ShowRunParms(exampid string) {
234 llu.Printf("%sHOST:%v\n", exampid, os.Getenv("STOMP_HOST"))
235 llu.Printf("%sPORT:%v\n", exampid, os.Getenv("STOMP_PORT"))
236 llu.Printf("%sPROTOCOL:%v\n", exampid, senv.Protocol())
237 llu.Printf("%sVHOST:%v\n", exampid, senv.Vhost())
238 llu.Printf("%sNQS:%v\n", exampid, Nqs())
239 llu.Printf("%sNMSGS:%v\n", exampid, senv.Nmsgs())
240 llu.Printf("%sSUBCHANCAP:%v\n", exampid, senv.SubChanCap())
241 llu.Printf("%sRECVFACT:%v\n", exampid, RecvFactor())
242 llu.Printf("%sSENDFACT:%v\n", exampid, SendFactor())
243 llu.Printf("%sRECVWAIT:%t\n", exampid, RecvWait())
244 llu.Printf("%sSENDWAIT:%t\n", exampid, SendWait())
245 llu.Printf("%sACKMODE:%v\n", exampid, AckMode())
248 // Return broker identity
249 func ServerIdent(c *stompngo.Connection) string {
250 cdh := c.ConnectResponse
251 sr, ok := cdh.Headers.Contains("server")
252 if !ok {
253 return "N/A"
255 return sr
258 // Common example connect logic
259 func CommonConnect(exampid, tag string, l *log.Logger) (net.Conn,
260 *stompngo.Connection,
261 error) {
263 l.Printf("%stag:%s consess:%v common_connect_starts\n",
264 exampid, tag, Lcs)
266 // Set up the connection.
267 h, p := senv.HostAndPort()
268 hap := net.JoinHostPort(h, p)
269 n, e := net.Dial("tcp", hap)
270 if e != nil {
271 return nil, nil, e
274 l.Printf("%stag:%s connsess:%s common_connect_host_and_port:%v\n",
275 exampid, tag, Lcs,
276 hap)
278 // Create connect headers and connect to stompngo
279 ch := ConnectHeaders()
280 l.Printf("%stag:%s connsess:%s common_connect_headers headers:%v\n",
281 exampid, tag, Lcs,
283 conn, e := stompngo.Connect(n, ch)
284 if e != nil {
285 return nil, conn, e
287 SetLogger(conn) // Maybe set a connection logger
288 l.Printf("%stag:%s connsess:%s common_connect_complete host:%s port:%s vhost:%s protocol:%s server:%s\n",
289 exampid, tag, conn.Session(),
290 h, p, senv.Vhost(), conn.Protocol(), ServerIdent(conn))
292 // Show connect response
293 l.Printf("%stag:%s connsess:%s common_connect_response connresp:%v\n",
294 exampid, tag, conn.Session(),
295 conn.ConnectResponse)
297 // Heartbeat Data
298 l.Printf("%stag:%s connsess:%s common_connect_heart_beat_send hbsend:%d\n",
299 exampid, tag, conn.Session(),
300 conn.SendTickerInterval())
301 l.Printf("%stag:%s connsess:%s common_connect_heart_beat_recv hbrecv:%d\n",
302 exampid, tag, conn.Session(),
303 conn.ReceiveTickerInterval())
305 l.Printf("%stag:%s connsess:%s common_connect_local_addr:%s\n",
306 exampid, tag, conn.Session(),
307 n.LocalAddr().String())
308 l.Printf("%stag:%s connsess:%s common_connect_remote_addr:%s\n",
309 exampid, tag, conn.Session(),
310 n.RemoteAddr().String())
313 return n, conn, nil
316 // Common example disconnect logic
317 func CommonDisconnect(n net.Conn, conn *stompngo.Connection,
318 exampid, tag string,
319 l *log.Logger) error {
321 // Disconnect from the Stomp server
322 e := conn.Disconnect(stompngo.Headers{})
323 if e != nil {
324 return e
326 l.Printf("%stag:%s consess:%v common_disconnect_complete local_addr:%s remote_addr:%s\n",
327 exampid, tag, conn.Session(),
328 n.LocalAddr().String(), n.RemoteAddr().String())
330 // Close the network connection
331 e = n.Close()
332 if e != nil {
333 return e
336 // Parting messages
337 l.Printf("%stag:%s consess:%v common_disconnect_network_close_complete\n",
338 exampid, tag, conn.Session())
339 l.Printf("%stag:%s consess:%v common_disconnect_ends\n",
340 exampid, tag, conn.Session())
343 return nil
346 // Common example TLS connect logic
347 func CommonTLSConnect(exampid, tag string, l *log.Logger,
348 c *tls.Config) (net.Conn, *stompngo.Connection, error) {
350 l.Printf("%stag:%s consess:%s common_tls_connect_starts\n",
351 exampid, tag, Lcs)
353 // Set up the connection.
354 h, p := senv.HostAndPort()
355 hap := net.JoinHostPort(h, p)
356 n, e := net.Dial("tcp", hap)
357 if e != nil {
358 return nil, nil, e
361 c.ServerName = h // SNI
363 nc := tls.Client(n, c) // Returns: *tls.Conn : implements net.Conn
364 e = nc.Handshake()
365 if e != nil {
366 if e.Error() == "EOF" {
367 l.Printf("%stag:%s consess:%s common_tls_handshake_EOF_Is_the_broker_port_TLS_enabled? port:%s\n",
368 exampid, tag, Lcs,
371 l.Fatalf("%stag:%s consess:%s common_tls_handshake_failed error:%v\n",
372 exampid, tag, Lcs,
373 e.Error())
375 l.Printf("%stag:%s consess:%s common_tls_handshake_complete\n",
376 exampid, tag, Lcs)
378 l.Printf("%stag:%s connsess:%s common_tls_connect_host_and_port:%v\n",
379 exampid, tag, Lcs,
380 hap)
382 // Create connect headers and connect to stompngo
383 ch := ConnectHeaders()
384 l.Printf("%stag:%s connsess:%s common_tls_connect_headers headers:%v\n",
385 exampid, tag, Lcs,
387 conn, e := stompngo.Connect(nc, ch)
388 if e != nil {
389 return nil, nil, e
391 SetLogger(conn)
392 l.Printf("%stag:%s connsess:%s common_tls_connect_complete host:%s vhost:%s protocol:%s server:%s\n",
393 exampid, tag, conn.Session(),
394 h, senv.Vhost(), conn.Protocol(), ServerIdent(conn))
396 // Show connect response
397 l.Printf("%stag:%s connsess:%s common_tls_connect_response connresp:%v\n",
398 exampid, tag, conn.Session(),
399 conn.ConnectResponse)
401 // Show heartbeat data (if heart beats are being used)
402 if senv.Heartbeats() != "" {
403 l.Printf("%stag:%s connsess:%s common_tls_connect_heart_beat_send hbsend:%v\n",
404 exampid, tag, conn.Session(),
405 conn.SendTickerInterval())
406 l.Printf("%stag:%s connsess:%s common_tls_connect_heart_beat_recv hbrecv:%v\n",
407 exampid, tag, conn.Session(),
408 conn.ReceiveTickerInterval())
411 l.Printf("%stag:%s connsess:%s common_tls_connect_local_addr:%s\n",
412 exampid, tag, conn.Session(),
413 n.LocalAddr().String())
414 l.Printf("%stag:%s connsess:%s common_tls_connect_remote_addr:%s\n",
415 exampid, tag, conn.Session(),
416 n.RemoteAddr().String())
419 return nc, conn, nil
422 // Example destination
423 func Dest() string {
424 d := senv.Dest()
425 if os.Getenv("STOMP_ARTEMIS") == "" {
426 return d
428 pref := "jms.queue"
429 if strings.Index(d, "topic") >= 0 {
430 pref = "jms.topic"
432 return pref + strings.Replace(d, "/", ".", -1)
435 // Set Logger
436 func SetLogger(conn *stompngo.Connection) {
437 if Logger() != "" {
438 ul := log.New(os.Stdout, Logger()+" ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
439 conn.SetLogger(ul)