Version bump.
[stompngo_examples.git] / publish / publish.go
blobc688177d3e6d138cfa40ec776a44edfa48c8aa28
1 //
2 // Copyright © 2013-2018 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 Publish messages to a STOMP broker.
20 Examples:
22 # Publish to a broker with all defaults:
23 # Host is "localhost"
24 # Port is 61613
25 # Login is "guest"
26 # Passcode is "guest
27 # Virtual Host is "localhost"
28 # Protocol is 1.1
29 go run publish.go
31 # Publish to a broker using STOMP protocol level 1.0:
32 STOMP_PROTOCOL=1.0 go run publish.go
34 # Publish to a broker using a custom host and port:
35 STOMP_HOST=tjjackson STOMP_PORT=62613 go run publish.go
37 # Publish to a broker using a custom port and virtual host:
38 STOMP_PORT=41613 STOMP_VHOST="/" go run publish.go
40 # Publish to a broker using a custom login and passcode:
41 STOMP_LOGIN="userid" STOMP_PASSCODE="t0ps3cr3t" go run publish.go
43 # Important environment variables for this program are:
45 # STOMP_NGORS - the number of go routines used to write to the
46 # sepcified queues.
48 # STOMP_NMSGS - the number of messages each go routine will write.
50 # STOMP_NQS - The number of queues to write messages to. If this
51 # variable is absent, the value defaults to the value specified
52 # for STOMP_NGORS. If this value is specified, all go routines
53 # are multi-plexed across this number of queues.
56 package main
58 import (
59 "fmt"
60 "log"
61 "net"
62 "os"
63 "runtime"
64 "runtime/pprof"
65 "strconv"
66 "sync"
67 "time"
69 "github.com/gmallard/stompngo"
70 // senv methods could be used in general by stompngo clients.
71 "github.com/gmallard/stompngo/senv"
72 // sngecomm methods are used specifically for these example clients.
73 "github.com/gmallard/stompngo_examples/sngecomm"
76 var (
77 exampid = "publish: "
78 ll = log.New(os.Stdout, "EPUB ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
79 tag = "pubmain"
80 wg sync.WaitGroup
81 conn *stompngo.Connection
82 n net.Conn
83 e error
84 nqs int
85 ngor int
86 // MNHDR is the message number, in message headers
87 MNHDR = "sng_msgnum"
88 gorstr = 1 // Starting destination number
90 msfl = true // Fixed message length
91 mslen = 1024 // The fixed length
92 msf []byte
94 gorsl = false
95 gorslfb = false
96 gorslfx = time.Duration(250 * time.Millisecond)
97 gorslms = 250
99 max int64 = 1e9 // Max stagger time (nanoseconds)
100 min = max / 10 // Min stagger time (nanoseconds)
101 // Sleep multipliers
102 sf = 1.0
103 rf = 1.0
106 func init() {
107 ngor = sngecomm.Ngors()
108 nqs = sngecomm.Nqs()
110 // Options around message length:
111 // 1) fixed length
112 // 2) randomly variable length
113 if os.Getenv("STOMP_VARMSL") != "" {
114 msfl = false // Use randomly variable message lengths
116 if msfl {
117 if s := os.Getenv("STOMP_FXMSLEN"); s != "" {
118 i, e := strconv.ParseInt(s, 10, 32)
119 if nil != e {
120 log.Printf("v1:%v v2:%v\n", "FXMSLEN conversion error", e)
121 } else {
122 mslen = int(i) // The fixed length to use
125 msf = sngecomm.PartialSubstr(mslen)
128 // Options controlling sleeps between message sends. Options are:
129 // 1) Don't sleep
130 // 2) Sleep a fixed amount of time
131 // 3) Sleep a random variable amount of time
132 if os.Getenv("STOMP_DOSLEEP") != "" {
133 gorsl = true // Do sleep
135 if os.Getenv("STOMP_FIXSLEEP") != "" {
136 gorslfb = true // Do a fixed length sleep
138 if gorsl {
139 var err error
140 if s := os.Getenv("STOMP_SLEEPMS"); s != "" { // Fixed length milliseconds
141 mss := fmt.Sprintf("%s", s) + "ms"
142 gorslfx, err = time.ParseDuration(mss)
143 if err != nil {
144 log.Printf("v1:%v v2:%v v3:%v\n", "ParseDuration conversion error", mss, e)
148 // Option controlling destination numbering. Destinations are normally
149 // suffixed with a sequence number, starting at 1. This option allows
150 // that starting sequence number to be arbitrary.
151 if s := os.Getenv("STOMP_GORNSTR"); s != "" {
152 i, e := strconv.ParseInt(s, 10, 32)
153 if nil != e {
154 log.Printf("v1:%v v2:%v\n", "GORNSTR conversion error", e)
155 } else {
156 gorstr = int(i) // The starting sequence number to use.
160 func runSends(gr int, qn int) {
161 var err error
162 qns := fmt.Sprintf("%d", qn)
163 qname := sngecomm.Dest() + "." + qns
164 sh := stompngo.Headers{"destination", qname}
165 ll.Printf("%stag:%s connsess:%s destination dest:%s BEGIN_runSends %d\n",
166 exampid, tag, conn.Session(),
167 qname, gr)
168 if senv.Persistent() {
169 sh = sh.Add("persistent", "true")
171 sh = sh.Add(MNHDR, "0")
172 mnhnum := sh.Index(MNHDR)
173 ll.Printf("%stag:%s connsess:%s send headers:%v\n",
174 exampid, tag, conn.Session(),
176 for i := 1; i <= senv.Nmsgs(); i++ {
177 is := fmt.Sprintf("%d", i) // Next message number
178 sh[mnhnum+1] = is // Put message number in headers
179 // Log send headers
180 ll.Printf("%stag:%s connsess:%s main_sending gr:%d hdrs:%v\n",
181 exampid, tag, conn.Session(),
182 gr, sh)
184 // Handle fixed or variable message length
185 rml := 0
186 if msfl {
187 err = conn.SendBytes(sh, msf)
188 rml = len(msf)
189 } else {
190 // ostr := string(sngecomm.Partial())
191 // err = conn.Send(sh, ostr)
192 oby := sngecomm.Partial()
193 err = conn.SendBytes(sh, oby)
194 rml = len(oby)
196 if err != nil {
197 ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
198 exampid, tag, conn.Session(),
199 err.Error()) // Handle this ......
201 ll.Printf("%stag:%s connsess:%s main_send_complete gr:%d msfl:~%t~len:%d\n",
202 exampid, tag, conn.Session(),
203 gr, msfl, rml)
205 // Handle sleep options
206 if gorsl {
207 if gorslfb {
208 // Fixed time to sleep
209 ll.Printf("%stag:%s connsess:%s gr:%d main_fixed sleep:~%v\n",
210 exampid, tag, conn.Session(), gr, gorslfx)
211 time.Sleep(gorslfx)
212 } else {
213 // Variable time to sleep
214 dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
215 ll.Printf("%stag:%s connsess:%s gr:%d main_rand sleep:~%v\n",
216 exampid, tag, conn.Session(), gr, dt)
217 time.Sleep(dt)
221 if sngecomm.UseEOF() {
222 sh := stompngo.Headers{"destination", qname}
223 _ = conn.Send(sh, sngecomm.EOFMsg)
224 ll.Printf("%stag:%s connsess:%s gr:%d sent EOF [%s]\n",
225 exampid, tag, conn.Session(), gr, sngecomm.EOFMsg)
227 wg.Done() // signal a goroutine completion
230 // Connect to a STOMP broker, publish some messages and disconnect.
231 func main() {
233 if sngecomm.Pprof() {
234 if sngecomm.Cpuprof() != "" {
235 ll.Printf("%stag:%s connsess:%s CPUPROF %s\n",
236 exampid, tag, sngecomm.Lcs, sngecomm.Cpuprof())
237 f, err := os.Create(sngecomm.Cpuprof())
238 if err != nil {
239 log.Fatal("could not create CPU profile: ", err)
241 if err := pprof.StartCPUProfile(f); err != nil {
242 log.Fatal("could not start CPU profile: ", err)
244 defer pprof.StopCPUProfile()
248 st := time.Now()
250 // Standard example connect sequence
251 n, conn, e = sngecomm.CommonConnect(exampid, tag, ll)
252 if e != nil {
253 ll.Fatalf("%stag:%s connsess:%s main_on_connect error:%v",
254 exampid, tag, sngecomm.Lcs,
255 e.Error()) // Handle this ......
258 ll.Printf("%stag:%s connsess:%s START gorstr:%d ngor:%d nqs:%d nmsgs:%d\n",
259 exampid, tag, conn.Session(), gorstr, ngor, nqs, senv.Nmsgs())
261 rqn := gorstr - 1
262 for i := gorstr; i <= gorstr+ngor-1; i++ {
263 wg.Add(1)
264 rqn++
265 if nqs > 1 && rqn > nqs {
266 rqn = gorstr
268 go runSends(i, rqn)
270 wg.Wait()
272 // Standard example disconnect sequence
273 e = sngecomm.CommonDisconnect(n, conn, exampid, tag, ll)
274 if e != nil {
275 ll.Fatalf("%stag:%s connsess:%s main_on_disconnect error:%v",
276 exampid, tag, conn.Session(),
277 e.Error()) // Handle this ......
280 ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
281 exampid, tag, conn.Session(),
282 time.Now().Sub(st))
284 if sngecomm.Pprof() {
285 if sngecomm.Memprof() != "" {
286 ll.Printf("%stag:%s connsess:%s MEMPROF %s\n",
287 exampid, tag, conn.Session(), sngecomm.Memprof())
288 f, err := os.Create(sngecomm.Memprof())
289 if err != nil {
290 log.Fatal("could not create memory profile: ", err)
292 runtime.GC() // get up-to-date statistics
293 if err := pprof.WriteHeapProfile(f); err != nil {
294 log.Fatal("could not write memory profile: ", err)
296 f.Close()