Rework writer deadline handling.
[stompngo.git] / subscribe.go
blob94abad75bf74740c740c235a31968394e8ed5503
1 //
2 // Copyright © 2011-2017 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "fmt"
21 "log"
22 "strconv"
25 var _ = fmt.Println
28 Subscribe to a STOMP subscription.
30 Headers MUST contain a "destination" header key.
32 All clients are recommended to supply a unique HK_ID header on Subscribe.
34 For STOMP 1.0 clients: if an "id" header is supplied, attempt to use it.
35 If the "id" header is not unique in the session, return an error. If no
36 "id" header is supplied, attempt to generate a unique subscription id based
37 on the destination name. If a unique subscription id cannot be generated,
38 return an error.
40 For STOMP 1.1+ clients: If any client does not supply an HK_ID header,
41 attempt to generate a unique "id". In all cases, do not allow duplicate
42 subscription "id"s in this session.
44 In summary, multiple subscriptions to the same destination are not allowed
45 unless a unique "id" is supplied.
47 For details about the returned MessageData channel, see: https://github.com/gmallard/stompngo/wiki/subscribe-and-messagedata
49 Example:
50 // Possible additional Header keys: "ack", "id".
51 h := stompngo.Headers{stompngo.HK_DESTINATION, "/queue/myqueue"}
52 s, e := c.Subscribe(h)
53 if e != nil {
54 // Do something sane ...
58 func (c *Connection) Subscribe(h Headers) (<-chan MessageData, error) {
59 c.log(SUBSCRIBE, "start", h, c.Protocol())
60 if !c.connected {
61 return nil, ECONBAD
63 e := checkHeaders(h, c.Protocol())
64 if e != nil {
65 return nil, e
67 e = c.checkSubscribeHeaders(h)
68 if e != nil {
69 return nil, e
71 ch := h.Clone()
72 if _, ok := ch.Contains(HK_ACK); !ok {
73 ch = append(ch, HK_ACK, AckModeAuto)
75 sub, e, ch := c.establishSubscription(ch)
76 if e != nil {
77 return nil, e
80 f := Frame{SUBSCRIBE, ch, NULLBUFF}
82 r := make(chan error)
83 c.output <- wiredata{f, r}
84 e = <-r
85 c.log(SUBSCRIBE, "end", ch, c.Protocol())
86 return sub.md, e
90 Check SUBSCRIBE specific requirements.
92 func (c *Connection) checkSubscribeHeaders(h Headers) error {
93 if _, ok := h.Contains(HK_DESTINATION); !ok {
94 return EREQDSTSUB
97 am, ok := h.Contains(HK_ACK)
99 switch c.Protocol() {
100 case SPL_10:
101 if ok { // Client supplied ack header
102 if !validAckModes10[am] {
103 return ESBADAM
106 case SPL_11:
107 fallthrough
108 case SPL_12:
109 if ok { // Client supplied ack header
110 if !(validAckModes10[am] || validAckModes1x[am]) {
111 return ESBADAM
114 default:
115 log.Fatalf("Internal protocol level error:<%s>\n", c.Protocol())
117 return nil
121 Handle subscribe id.
123 func (c *Connection) establishSubscription(h Headers) (*subscription, error, Headers) {
124 // c.log(SUBSCRIBE, "start establishSubscription")
126 id, hid := h.Contains(HK_ID)
127 uuid1 := Uuid()
128 sha11 := Sha1(h.Value(HK_DESTINATION))
130 c.subsLock.RLock() // Acquire Read lock
131 // No duplicates
132 if hid {
133 if _, q := c.subs[id]; q {
134 c.subsLock.RUnlock() // Release Read lock
135 return nil, EDUPSID, h // Duplicate subscriptions not allowed
137 if _, q := c.subs[sha11]; q {
138 c.subsLock.RUnlock() // Release Read lock
139 return nil, EDUPSID, h // Duplicate subscriptions not allowed
141 } else {
142 if _, q := c.subs[uuid1]; q {
143 c.subsLock.RUnlock() // Release Read lock
144 return nil, EDUPSID, h // Duplicate subscriptions not allowed
147 c.subsLock.RUnlock() // Release Read lock
149 sd := new(subscription) // New subscription data
150 if hid {
151 sd.id = id // Note user supplied id
153 sd.cs = false // No shutdown yet
154 sd.drav = false // Drain after value validity
155 sd.dra = 0 // Never drain MESSAGE frames
156 sd.drmc = 0 // Current drain count
157 sd.md = make(chan MessageData, c.scc) // Make subscription MD channel
158 sd.am = h.Value(HK_ACK) // Set subscription ack mode
160 if !hid {
161 // No caller supplied ID. This STOMP client package supplies one. It is the
162 // caller's responsibility for discover the value from subsequent message
163 // traffic.
164 switch c.Protocol() {
165 case SPL_10:
166 nsid := sha11 // This will be unique for a given estination
167 sd.id = nsid
168 h = h.Add(HK_ID, nsid)
169 case SPL_11:
170 fallthrough
171 case SPL_12:
172 sd.id = uuid1
173 h = h.Add(HK_ID, uuid1)
174 default:
175 log.Fatalf("Internal protocol level error:<%s>\n", c.Protocol())
179 // STOMP Protocol Enhancement
180 if dc, okda := h.Contains(StompPlusDrainAfter); okda {
181 n, e := strconv.ParseInt(dc, 10, 0)
182 if e != nil {
183 log.Printf("sng_drafter conversion error: %v\n", e)
184 } else {
185 sd.drav = true // Drain after value is OK
186 sd.dra = uint(n) // Drain after count
190 // This is a write lock
191 c.subsLock.Lock()
192 c.subs[sd.id] = sd // Add subscription to the connection subscription map
193 c.subsLock.Unlock()
194 //c.log(SUBSCRIBE, "end establishSubscription")
195 return sd, nil, h // Return the subscription pointer