Rework writer deadline handling.
[stompngo.git] / ack_test.go
blobf1811dfdd3ba98d29374e347e6beae66dcd744a5
1 //
2 // Copyright © 2011-2017 Guy M. Allard
3 //
4 // Licensed under the Apache License, Veridon 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permisidons and
14 // limitations under the License.
17 package stompngo
19 import (
20 "fmt"
21 "os"
22 "testing"
23 "time"
26 var _ = fmt.Println
29 Test Ack errors.
31 func TestAckErrors(t *testing.T) {
32 n, _ = openConn(t)
33 ch := login_headers
34 conn, e = Connect(n, ch)
35 if e != nil {
36 t.Fatalf("TestAckErrors CONNECT expected nil, got %v\n", e)
39 for _, tv := range terrList {
40 conn.protocol = tv.proto // Fake it
41 e = conn.Ack(tv.headers)
42 if e != tv.errval {
43 t.Fatalf("ACK -%s- expected error [%v], got [%v]\n",
44 tv.proto, tv.errval, e)
47 checkReceived(t, conn)
48 e = conn.Disconnect(empty_headers)
49 checkDisconnectError(t, e)
50 _ = closeConn(t, n)
54 Test Ack Same Connection.
56 func TestAckSameConn(t *testing.T) {
57 for _, sp := range Protocols() {
58 n, _ = openConn(t)
59 ch := login_headers
60 ch = headersProtocol(ch, sp)
61 conn, e = Connect(n, ch)
62 if e != nil {
63 t.Fatalf("TestAckSameConn CONNECT expected nil, got %v\n", e)
66 // Basic headers
67 wh := Headers{HK_DESTINATION,
68 tdest(TEST_TDESTPREF + "acksc1-" + conn.Protocol())}
69 // Subscribe Headers
70 sbh := wh.Add(HK_ACK, AckModeClient)
71 id := TEST_TDESTPREF + "acksc1.chkprotocol-" + conn.Protocol()
72 sbh = sbh.Add(HK_ID, id) // Always use an 'id'
73 ms := "acksc1 message 1"
75 // Subscribe
76 sc, e = conn.Subscribe(sbh)
77 if e != nil {
78 t.Fatalf("TestAckSameConn SUBSCRIBE expected [nil], got: [%v]\n", e)
82 // Send
83 sh := wh.Clone()
84 // For RabbitMQ and STOMP 1.0, do not add current-time header, where the
85 // value contains ':' characters.
86 switch conn.Protocol() {
87 case SPL_10:
88 if os.Getenv("STOMP_RMQ") == "" {
89 sh = sh.Add("current-time", time.Now().String()) // The added header value has ':' characters
91 default:
92 sh = sh.Add("current-time", time.Now().String()) // The added header value has ':' characters
94 e = conn.Send(sh, ms)
95 if e != nil {
96 t.Fatalf("TestAckSameConn SEND expected [nil], got: [%v]\n", e)
99 // Read MessageData
100 select {
101 case md = <-sc:
102 case md = <-conn.MessageData:
103 t.Fatalf("TestAckSameConn read channel error: expected [nil], got: [%v]\n",
104 md.Message.Command)
106 if md.Error != nil {
107 t.Fatalf("TestAckSameConn read error: expected [nil], got: [%v]\n",
108 md.Error)
110 if ms != md.Message.BodyString() {
111 t.Fatalf("TestAckSameConn message error: expected: [%v], got: [%v] Message: [%q]\n",
112 ms, md.Message.BodyString(), md.Message)
114 // Ack headers
115 ah := Headers{}
116 if conn.Protocol() == SPL_12 {
117 ah = ah.Add(HK_ID, md.Message.Headers.Value(HK_ACK))
118 } else {
119 ah = ah.Add(HK_MESSAGE_ID, md.Message.Headers.Value(HK_MESSAGE_ID))
122 if conn.Protocol() == SPL_11 {
123 ah = ah.Add(HK_SUBSCRIPTION, id) // Always use subscription for 1.1
125 // Ack
126 e = conn.Ack(ah)
127 if e != nil {
128 t.Fatalf("ACK expected [nil], got: [%v]\n", e)
130 // Make sure Apollo Jira issue APLO-88 stays fixed.
131 select {
132 case md = <-sc:
133 t.Fatalf("TestAckSameConn RECEIVE not expected, got: [%v]\n", md)
134 default:
137 // Unsubscribe
138 uh := wh.Add(HK_ID, id)
139 e = conn.Unsubscribe(uh)
140 if e != nil {
141 t.Fatalf("TestAckSameConn UNSUBSCRIBE expected [nil], got: [%v]\n", e)
145 checkReceived(t, conn)
146 e = conn.Disconnect(empty_headers)
147 checkDisconnectError(t, e)
148 _ = closeConn(t, n)
153 Test Ack Different Connection.
155 func TestAckDiffConn(t *testing.T) {
157 for _, sp := range Protocols() {
158 n, _ = openConn(t)
159 ch := login_headers
160 ch = headersProtocol(ch, sp)
161 conn, e = Connect(n, ch)
162 if e != nil {
163 t.Fatalf("TestAckDiffConn CONNECT expected nil, got %v\n", e)
166 // Basic headers
167 wh := Headers{HK_DESTINATION,
168 tdest(TEST_TDESTPREF + "acksc1-" + conn.Protocol())}
169 ms := "acksc1 message 1"
170 // Send
171 sh := wh.Clone()
172 // For RabbitMQ and STOMP 1.0, do not add current-time header, where the
173 // value contains ':' characters.
174 switch conn.Protocol() {
175 case SPL_10:
176 if os.Getenv("STOMP_RMQ") == "" {
177 sh = sh.Add("current-time", time.Now().String()) // The added header value has ':' characters
179 default:
180 sh = sh.Add("current-time", time.Now().String()) // The added header value has ':' characters
182 e = conn.Send(sh, ms)
183 if e != nil {
184 t.Fatalf("TestAckDiffConn SEND expected [nil], got: [%v]\n", e)
187 checkReceived(t, conn)
188 e = conn.Disconnect(empty_headers)
189 checkDisconnectError(t, e)
190 _ = closeConn(t, n)
192 n, _ = openConn(t)
193 ch = login_headers
194 ch = headersProtocol(ch, sp)
195 conn, e = Connect(n, ch) // Reconnect
196 if e != nil {
197 t.Fatalf("TestAckDiffConn Second Connect, expected no error, got:<%v>\n", e)
200 // Subscribe Headers
201 sbh := wh.Add(HK_ACK, AckModeClient)
202 id := TEST_TDESTPREF + "acksc1.chkprotocol-" + conn.Protocol()
203 sbh = sbh.Add(HK_ID, id) // Always use an 'id'
204 // Subscribe
205 sc, e = conn.Subscribe(sbh)
206 if e != nil {
207 t.Fatalf("TestAckDiffConn SUBSCRIBE expected [nil], got: [%v]\n", e)
209 // Read MessageData
210 select {
211 case md = <-sc:
212 case md = <-conn.MessageData:
213 t.Fatalf("TestAckDiffConn read channel error: expected [nil], got: [%v]\n",
214 md.Message.Command)
216 if md.Error != nil {
217 t.Fatalf("read error: expected [nil], got: [%v]\n", md.Error)
219 if ms != md.Message.BodyString() {
220 t.Fatalf("TestAckDiffConn message error: expected: [%v], got: [%v] Message: [%q]\n",
221 ms, md.Message.BodyString(), md.Message)
223 // Ack headers
224 ah := Headers{}
225 if conn.Protocol() == SPL_12 {
226 ah = ah.Add(HK_ID, md.Message.Headers.Value(HK_ACK))
227 } else {
228 ah = ah.Add(HK_MESSAGE_ID, md.Message.Headers.Value(HK_MESSAGE_ID))
231 if conn.Protocol() == SPL_11 {
232 ah = ah.Add(HK_SUBSCRIPTION, id) // Always use subscription for 1.1
234 // Ack
235 e = conn.Ack(ah)
236 if e != nil {
237 t.Fatalf("TestAckDiffConn ACK expected [nil], got: [%v]\n", e)
239 // Make sure Apollo Jira issue APLO-88 stays fixed.
240 select {
241 case md = <-sc:
242 t.Fatalf("TestAckDiffConn RECEIVE not expected, got: [%v]\n", md)
243 default:
245 // Unsubscribe
246 uh := wh.Add(HK_ID, id)
247 e = conn.Unsubscribe(uh)
248 if e != nil {
249 t.Fatalf("TestAckDiffConn UNSUBSCRIBE expected [nil], got: [%v]\n", e)
252 checkReceived(t, conn)
253 e = conn.Disconnect(empty_headers)
254 checkDisconnectError(t, e)
255 _ = closeConn(t, n)