Fix races found with stompngo_examples.
[stompngo.git] / subscribe.go
blob9c31bafb13cb684b11e091971b7081647a340463
1 //
2 // Copyright © 2011-2018 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "fmt"
21 "log"
22 "strconv"
25 var _ = fmt.Println
28 Subscribe to a STOMP subscription.
30 Headers MUST contain a "destination" header key.
32 All clients are recommended to supply a unique HK_ID header on Subscribe.
34 For STOMP 1.0 clients: if an "id" header is supplied, attempt to use it.
35 If the "id" header is not unique in the session, return an error. If no
36 "id" header is supplied, attempt to generate a unique subscription id based
37 on the destination name. If a unique subscription id cannot be generated,
38 return an error.
40 For STOMP 1.1+ clients: If any client does not supply an HK_ID header,
41 attempt to generate a unique "id". In all cases, do not allow duplicate
42 subscription "id"s in this session.
44 In summary, multiple subscriptions to the same destination are not allowed
45 unless a unique "id" is supplied.
47 For details about the returned MessageData channel, see: https://github.com/gmallard/stompngo/wiki/subscribe-and-messagedata
49 Example:
50 // Possible additional Header keys: "ack", "id".
51 h := stompngo.Headers{stompngo.HK_DESTINATION, "/queue/myqueue"}
52 s, e := c.Subscribe(h)
53 if e != nil {
54 // Do something sane ...
58 func (c *Connection) Subscribe(h Headers) (<-chan MessageData, error) {
59 c.log(SUBSCRIBE, "start", h, c.Protocol())
60 if !c.connected {
61 return nil, ECONBAD
63 e := checkHeaders(h, c.Protocol())
64 if e != nil {
65 return nil, e
67 e = c.checkSubscribeHeaders(h)
68 if e != nil {
69 return nil, e
71 ch := h.Clone()
72 if _, ok := ch.Contains(HK_ACK); !ok {
73 ch = append(ch, HK_ACK, AckModeAuto)
75 sub, e, ch := c.establishSubscription(ch)
76 if e != nil {
77 return nil, e
80 f := Frame{SUBSCRIBE, ch, NULLBUFF}
82 r := make(chan error)
83 if e = c.writeWireData(wiredata{f, r}); e != nil {
84 return nil, e
86 e = <-r
87 c.log(SUBSCRIBE, "end", ch, c.Protocol())
88 return sub.md, e
92 Check SUBSCRIBE specific requirements.
94 func (c *Connection) checkSubscribeHeaders(h Headers) error {
95 if _, ok := h.Contains(HK_DESTINATION); !ok {
96 return EREQDSTSUB
99 am, ok := h.Contains(HK_ACK)
101 switch c.Protocol() {
102 case SPL_10:
103 if ok { // Client supplied ack header
104 if !validAckModes10[am] {
105 return ESBADAM
108 case SPL_11:
109 fallthrough
110 case SPL_12:
111 if ok { // Client supplied ack header
112 if !(validAckModes10[am] || validAckModes1x[am]) {
113 return ESBADAM
116 default:
117 log.Fatalf("Internal protocol level error:<%s>\n", c.Protocol())
119 return nil
123 Handle subscribe id.
125 func (c *Connection) establishSubscription(h Headers) (*subscription, error, Headers) {
126 c.log(SUBSCRIBE, "start establishSubscription")
127 defer c.log(SUBSCRIBE, "end establishSubscription")
129 id, hid := h.Contains(HK_ID)
130 uuid1 := Uuid()
131 sha11 := Sha1(h.Value(HK_DESTINATION))
133 c.subsLock.RLock() // Acquire Read lock
134 // No duplicates
135 if hid {
136 if _, q := c.subs[id]; q {
137 c.subsLock.RUnlock() // Release Read lock
138 return nil, EDUPSID, h // Duplicate subscriptions not allowed
140 if _, q := c.subs[sha11]; q {
141 c.subsLock.RUnlock() // Release Read lock
142 return nil, EDUPSID, h // Duplicate subscriptions not allowed
144 } else {
145 if _, q := c.subs[uuid1]; q {
146 c.subsLock.RUnlock() // Release Read lock
147 return nil, EDUPSID, h // Duplicate subscriptions not allowed
150 c.subsLock.RUnlock() // Release Read lock
152 sd := new(subscription) // New subscription data
153 if hid {
154 sd.id = id // Note user supplied id
156 sd.cs = false // No shutdown yet
157 sd.drav = false // Drain after value validity
158 sd.dra = 0 // Never drain MESSAGE frames
159 sd.drmc = 0 // Current drain count
160 sd.md = make(chan MessageData, c.scc) // Make subscription MD channel
161 sd.am = h.Value(HK_ACK) // Set subscription ack mode
163 if !hid {
164 // No caller supplied ID. This STOMP client package supplies one. It is the
165 // caller's responsibility for discover the value from subsequent message
166 // traffic.
167 switch c.Protocol() {
168 case SPL_10:
169 nsid := sha11 // This will be unique for a given destination
170 sd.id = nsid
171 h = h.Add(HK_ID, nsid)
172 case SPL_11:
173 fallthrough
174 case SPL_12:
175 sd.id = uuid1
176 h = h.Add(HK_ID, uuid1)
177 default:
178 log.Fatalf("Internal protocol level error:<%s>\n", c.Protocol())
182 // STOMP Protocol Enhancement
183 if dc, okda := h.Contains(StompPlusDrainAfter); okda {
184 n, e := strconv.ParseInt(dc, 10, 0)
185 if e != nil {
186 log.Printf("sng_drafter conversion error: %v\n", e)
187 } else {
188 sd.drav = true // Drain after value is OK
189 sd.dra = uint(n) // Drain after count
193 // This is a write lock
194 c.subsLock.Lock()
195 c.subs[sd.id] = sd // Add subscription to the connection subscription map
196 c.subsLock.Unlock()
198 return sd, nil, h // Return the subscription pointer