Correct spelling in error message.
[stompngo.git] / unsubscribe.go
blob2840caa85c9f35468707a9689134a60ef5432308
1 //
2 // Copyright © 2011-2018 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "strconv"
21 "time"
25 Unsubscribe from a STOMP subscription.
27 Headers MUST contain a "destination" header key, and for Stomp 1.1+,
28 a "id" header key per the specifications. The subscription MUST currently
29 exist for this session.
31 Example:
32 // Possible additional Header keys: "id".
33 h := stompngo.Headers{stompngo.HK_DESTINATION, "/queue/myqueue"}
34 e := c.Unsubscribe(h)
35 if e != nil {
36 // Do something sane ...
40 func (c *Connection) Unsubscribe(h Headers) error {
41 c.log(UNSUBSCRIBE, "start", h)
42 // fmt.Printf("Unsub Headers: %v\n", h)
43 if !c.connected {
44 return ECONBAD
46 e := checkHeaders(h, c.Protocol())
47 if e != nil {
48 return e
51 // Specification Requirements:
52 // 1.0) requires either a destination header or an id header
53 // 1.1) ... requires ... the id header ....
54 // 1.2) an id header MUST be included in the frame
56 _, okd := h.Contains(HK_DESTINATION)
57 shid, oki := h.Contains(HK_ID)
58 switch c.Protocol() {
59 case SPL_12:
60 if !oki {
61 return EUNOSID
63 case SPL_11:
64 if !oki {
65 return EUNOSID
67 case SPL_10:
68 if !oki && !okd {
69 return EUNODSID
71 default:
72 panic("unsubscribe version not supported: " + c.Protocol())
75 shaid := Sha1(h.Value(HK_DESTINATION)) // Special for 1.0
76 c.subsLock.RLock()
77 s1x, p := c.subs[shid]
78 s10, ps := c.subs[shaid] // Special for 1.0
79 c.subsLock.RUnlock()
80 var usesp *subscription
81 usekey := ""
83 switch c.Protocol() {
84 case SPL_12:
85 fallthrough
86 case SPL_11:
87 if !oki {
88 return EUNOSID // id required
90 if !p { // subscription does not exist
91 return EBADSID // invalid subscription-id
93 usekey = shid
94 usesp = s1x
95 case SPL_10:
96 if !p && !ps {
97 return EUNODSID
99 usekey = shaid
100 usesp = s10
101 default:
102 panic("unsubscribe version not supported: " + c.Protocol())
105 sdn, ok := h.Contains(StompPlusDrainNow) // STOMP Protocol Extension
107 if !ok {
108 e = c.transmitCommon(UNSUBSCRIBE, h) // transmitCommon Clones() the headers
109 if e != nil {
110 return e
113 c.subsLock.Lock()
114 delete(c.subs, usekey)
115 c.subsLock.Unlock()
116 c.log(UNSUBSCRIBE, "end", h)
117 return nil
120 // STOMP Protocol Extension
122 c.log("sngdrnow extension detected")
123 idn, err := strconv.ParseInt(sdn, 10, 64)
124 if err != nil {
125 idn = 100 // 100 milliseconds if bad parameter
127 ival := time.Duration(idn * 1000000)
128 dmc := 0
129 forsel:
130 for {
131 ticker := time.NewTicker(ival)
132 select {
133 case mi, ok := <-usesp.md:
134 if !ok {
135 break forsel
137 dmc++
138 c.log("sngdrnow DROP", dmc, mi.Message.Command, mi.Message.Headers)
139 case _ = <-ticker.C:
140 c.log("sngdrnow extension BREAK")
141 break forsel
145 c.log("sngdrnow extension at very end")
146 c.subsLock.Lock()
147 delete(c.subs, usekey)
148 c.subsLock.Unlock()
149 c.log(UNSUBSCRIBE, "endsngdrnow", h)
150 return nil