Version bump.
[stompngo_examples.git] / srmgor_manyconn / srmgor_manyconn.go
blob20fd05bff553c2ec830c3fdcf478e30c1827f585
1 //
2 // Copyright © 2012-2018 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 // Show a number of queue writers and readers operating concurrently.
18 // Try to be realistic about workloads.
19 // Receiver checks messages for proper queue and message number.
22 Send and receive many STOMP messages using multiple queues and goroutines
23 to service each send or receive instance. Each sender and receiver
24 operates under a unique network connection.
26 Examples:
28 # A few queues and a few messages:
29 STOMP_NQS=5 STOMP_NMSGS=10 go run srmgor_manyconn.go
32 package main
34 import (
35 "fmt"
36 "log"
37 "net"
38 "os"
39 "runtime"
40 "sync"
41 "time"
43 "github.com/gmallard/stompngo"
44 // senv methods could be used in general by stompngo clients.
45 "github.com/gmallard/stompngo/senv"
46 // sngecomm methods are used specifically for these example clients.
47 "github.com/gmallard/stompngo_examples/sngecomm"
50 var (
51 exampid = "srmgor_manyconn: "
53 wgs sync.WaitGroup
54 wgr sync.WaitGroup
56 // We 'stagger' between each message send and message receive for a random
57 // amount of time.
58 // Vary these for experimental purposes. YMMV.
59 max int64 = 1e9 // Max stagger time (nanoseconds)
60 min int64 = max / 10 // Min stagger time (nanoseconds)
62 // Wait flags
63 sw = true
64 rw = true
66 // Sleep multipliers
67 sf float64 = 1.0
68 rf float64 = 1.0
70 // Number of messages
71 nmsgs = senv.Nmsgs()
73 ll = log.New(os.Stdout, "EMSMR ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
75 tag = "manyconn"
78 func sendMessages(conn *stompngo.Connection, qnum int, nc net.Conn) {
79 ltag := tag + "-sendmessages"
81 qns := fmt.Sprintf("%d", qnum) // queue number
82 d := sngecomm.Dest() + "." + string(exampid[:len(exampid)-2]) + "." + qns
83 ll.Printf("%stag:%s connsess:%s start d:%s qnum:%d\n",
84 exampid, ltag, conn.Session(),
85 d, qnum)
86 wh := stompngo.Headers{"destination", d,
87 "qnum", qns} // send Headers
88 if senv.Persistent() {
89 wh = wh.Add("persistent", "true")
92 tmr := time.NewTimer(100 * time.Hour)
93 // Send messages
94 for mc := 1; mc <= nmsgs; mc++ {
95 mcs := fmt.Sprintf("%d", mc)
96 sh := append(wh, "msgnum", mcs)
97 // Generate a message to send ...............
99 ll.Printf("%stag:%s connsess:%s message mc:%d qnum:%d\n",
100 exampid, ltag, conn.Session(),
101 mc, qnum)
102 e := conn.Send(sh, string(sngecomm.Partial()))
103 if e != nil {
104 ll.Fatalf("%stag:%s connsess:%s send_error qnum:%v error:%v",
105 exampid, ltag, conn.Session(),
106 qnum, e.Error()) // Handle this ......
108 if mc == nmsgs {
109 break
111 if sw {
112 runtime.Gosched() // yield for this example
113 dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
114 ll.Printf("%stag:%s connsess:%s send_stagger dt:%v qnum:%d mc:%d\n",
115 exampid, ltag, conn.Session(),
116 dt, qnum, mc)
117 tmr.Reset(dt)
118 _ = <-tmr.C
123 func receiveMessages(conn *stompngo.Connection, qnum int, nc net.Conn) {
124 ltag := tag + "-receivemessages"
126 qns := fmt.Sprintf("%d", qnum) // queue number
127 d := sngecomm.Dest() + "." + string(exampid[:len(exampid)-2]) + "." + qns
128 id := stompngo.Uuid() // A unique subscription ID
130 ll.Printf("%stag:%s connsess:%s receiveMessages_start id:%s d:%s qnum:%d nmsgs:%d\n",
131 exampid, ltag, conn.Session(),
132 id, d, qnum, nmsgs)
133 // Subscribe
134 sc := sngecomm.HandleSubscribe(conn, d, id, sngecomm.AckMode())
136 pbc := sngecomm.Pbc() // Print byte count
139 tmr := time.NewTimer(100 * time.Hour)
140 var md stompngo.MessageData
141 for mc := 1; mc <= nmsgs; mc++ {
143 select {
144 case md = <-sc:
145 case md = <-conn.MessageData:
146 // Frames RECEIPT or ERROR not expected here
147 ll.Fatalf("%stag:%s connsess:%s send_error qns:%v md:%v",
148 exampid, ltag, conn.Session(),
149 qns, md) // Handle this ......
151 if md.Error != nil {
152 ll.Fatalf("%stag:%s connsess:%s receive_error qns:%v error:%v\n",
153 exampid, ltag, conn.Session(),
154 qns, md.Error)
157 if md.Message.Command != stompngo.MESSAGE {
158 ll.Fatalf("%stag:%s connsess:%s bad_frame qns:%s mc:%d md:%v\n",
159 exampid, ltag, conn.Session(),
160 qns, mc, md)
163 mcs := fmt.Sprintf("%d", mc) // message number
164 if !md.Message.Headers.ContainsKV("qnum", qns) || !md.Message.Headers.ContainsKV("msgnum", mcs) {
165 ll.Fatalf("%stag:%s connsess:%s dirty_message qns:%v msgnum:%v md:%v",
166 exampid, tag, conn.Session(),
167 qns, mcs, md) // Handle this ......
170 // Process the inbound message .................
171 sl := len(md.Message.Body)
172 if pbc > 0 {
173 sl = pbc
174 if len(md.Message.Body) < sl {
175 sl = len(md.Message.Body)
178 ll.Printf("%stag:%s connsess:%s receiveMessages_msg d:%s body:%s qnum:%d msgnum:%s\n",
179 exampid, ltag, conn.Session(),
180 d, string(md.Message.Body[0:sl]), qnum,
181 md.Message.Headers.Value("msgnum"))
182 if mc == nmsgs {
183 break
185 // Handle ACKs if needed
186 if sngecomm.AckMode() != "auto" {
187 ah := []string{}
188 sngecomm.HandleAck(conn, ah, id)
190 if mc == nmsgs {
191 break
194 if rw {
195 runtime.Gosched() // yield for this example
196 dt := time.Duration(sngecomm.ValueBetween(min, max, rf))
197 ll.Printf("%stag:%s connsess:%s recv_stagger dt:%v qns:%s mc:%d\n",
198 exampid, ltag, conn.Session(),
199 dt, qns, mc)
200 tmr.Reset(dt)
201 _ = <-tmr.C
204 ll.Printf("%stag:%s connsess:%s end d:%s qnum:%d nmsgs:%d\n",
205 exampid, ltag, conn.Session(),
206 d, qnum, nmsgs)
208 // Unsubscribe
209 sngecomm.HandleUnsubscribe(conn, d, id)
213 func runReceiver(qnum int) {
214 ltag := tag + "-runreceiver"
216 ll.Printf("%stag:%s connsess:%s start qnum:%d\n",
217 exampid, ltag, sngecomm.Lcs,
218 qnum)
220 // Standard example connect sequence
221 n, conn, e := sngecomm.CommonConnect(exampid, ltag, ll)
222 if e != nil {
223 ll.Fatalf("%stag:%s connsess:%s error:%s\n",
224 exampid, ltag, sngecomm.Lcs,
225 e.Error()) // Handle this ......
229 conn.SetSubChanCap(senv.SubChanCap()) // Experiment with this value, YMMV
230 // Receives
231 receiveMessages(conn, qnum, n)
233 ll.Printf("%stag:%s connsess:%s receives_complete qnum:%d\n",
234 exampid, ltag, conn.Session(),
235 qnum)
237 // Standard example disconnect sequence
238 e = sngecomm.CommonDisconnect(n, conn, exampid, ltag, ll)
239 if e != nil {
240 ll.Fatalf("%stag:%s connsess:%s error:%s\n",
241 exampid, ltag, conn.Session(),
242 e.Error()) // Handle this ......
245 sngecomm.ShowStats(exampid, "recv_"+fmt.Sprintf("%d", qnum), conn)
246 wgr.Done()
249 func runSender(qnum int) {
251 ltag := tag + "-runsender"
253 ll.Printf("%stag:%s connsess:%s start qnum:%d\n",
254 exampid, ltag, sngecomm.Lcs,
255 qnum)
256 // Standard example connect sequence
257 n, conn, e := sngecomm.CommonConnect(exampid, ltag, ll)
258 if e != nil {
259 ll.Fatalf("%stag:%s connsess:%s error:%s\n",
260 exampid, ltag, sngecomm.Lcs,
261 e.Error()) // Handle this ......
265 sendMessages(conn, qnum, n)
267 ll.Printf("%stag:%s connsess:%s sends_complete qnum:%d\n",
268 exampid, ltag, conn.Session(),
269 qnum)
271 // Standard example disconnect sequence
272 e = sngecomm.CommonDisconnect(n, conn, exampid, ltag, ll)
273 if e != nil {
274 ll.Fatalf("%stag:%s connsess:%s error:%s\n",
275 exampid, ltag, conn.Session(),
276 e.Error()) // Handle this ......
278 sngecomm.ShowStats(exampid, "send_"+fmt.Sprintf("%d", qnum), conn)
279 wgs.Done()
282 func main() {
284 st := time.Now()
286 sngecomm.ShowRunParms(exampid)
288 ll.Printf("%stag:%s connsess:%s main_starts\n",
289 exampid, tag, sngecomm.Lcs)
291 ll.Printf("%stag:%s connsess:%s main_profiling pprof:%v\n",
292 exampid, tag, sngecomm.Lcs,
293 sngecomm.Pprof())
295 ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
296 exampid, tag, sngecomm.Lcs,
297 runtime.GOMAXPROCS(-1))
299 if sngecomm.SetMAXPROCS() {
300 nc := runtime.NumCPU()
301 ll.Printf("%stag:%s connsess:%s main_current_num_cpus cncpu:%v\n",
302 exampid, tag, sngecomm.Lcs,
304 gmp := runtime.GOMAXPROCS(nc)
305 ll.Printf("%stag:%s connsess:%s main_previous_num_cpus pncpu:%v\n",
306 exampid, tag, sngecomm.Lcs,
307 gmp)
308 ll.Printf("%stag:%s connsess:%s main_current_GOMAXPROCS gmp:%v\n",
309 exampid, tag, sngecomm.Lcs,
310 runtime.GOMAXPROCS(-1))
313 sw = sngecomm.SendWait()
314 rw = sngecomm.RecvWait()
315 sf = sngecomm.SendFactor()
316 rf = sngecomm.RecvFactor()
317 ll.Printf("%stag:%s connsess:%s main_wait_sleep_factors sw:%v rw:%v sf:%v rf:%v\n",
318 exampid, tag, sngecomm.Lcs,
319 sw, rw, sf, rf)
321 numq := sngecomm.Nqs()
322 nmsgs = senv.Nmsgs() // message count
324 ll.Printf("%stag:%s connsess:%s main_starting_receivers\n",
325 exampid, tag, sngecomm.Lcs)
326 for q := 1; q <= numq; q++ {
327 wgr.Add(1)
328 go runReceiver(q)
330 ll.Printf("%stag:%s connsess:%s main_started_receivers\n",
331 exampid, tag, sngecomm.Lcs)
333 ll.Printf("%stag:%s connsess:%s main_starting_senders\n",
334 exampid, tag, sngecomm.Lcs)
335 for q := 1; q <= numq; q++ {
336 wgs.Add(1)
337 go runSender(q)
339 ll.Printf("%stag:%s connsess:%s main_started_senders\n",
340 exampid, tag, sngecomm.Lcs)
342 wgs.Wait()
343 ll.Printf("%stag:%s connsess:%s main_senders_complete\n",
344 exampid, tag, sngecomm.Lcs)
345 wgr.Wait()
346 ll.Printf("%stag:%s connsess:%s main_receivers_complete\n",
347 exampid, tag, sngecomm.Lcs)
350 // The end
351 ll.Printf("%stag:%s connsess:%s main_elapsed:%v\n",
352 exampid, tag, sngecomm.Lcs,
353 time.Now().Sub(st))
354 time.Sleep(250 * time.Millisecond)