Version bump.
[stompngo.git] / data.go
blob6f3a033f77730d7ec2bd8de8f2b8c62f6f5e6693
1 //
2 // Copyright © 2011-2019 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed, an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "bufio"
21 "log"
22 "net"
23 "sync"
24 "time"
27 const (
29 // Client generated commands.
30 CONNECT = "CONNECT"
31 STOMP = "STOMP"
32 DISCONNECT = "DISCONNECT"
33 SEND = "SEND"
34 SUBSCRIBE = "SUBSCRIBE"
35 UNSUBSCRIBE = "UNSUBSCRIBE"
36 ACK = "ACK"
37 NACK = "NACK"
38 BEGIN = "BEGIN"
39 COMMIT = "COMMIT"
40 ABORT = "ABORT"
42 // Server generated commands.
43 CONNECTED = "CONNECTED"
44 MESSAGE = "MESSAGE"
45 RECEIPT = "RECEIPT"
46 ERROR = "ERROR"
48 // Supported STOMP protocol definitions.
49 SPL_10 = "1.0"
50 SPL_11 = "1.1"
51 SPL_12 = "1.2"
55 What this package currently supports.
57 var supported = []string{SPL_10, SPL_11, SPL_12}
60 Headers definition, a slice of string.
62 STOMP headers are key and value pairs. See the specification for more
63 information about STOMP frame headers.
65 Key values are found at even numbered indices. Values
66 are found at odd numbered indices. Headers are validated for an even
67 number of slice elements.
69 type Headers []string
72 Message is a STOMP Message, consisting of: a STOMP command; a set of STOMP
73 Headers; and a message body(payload), which is possibly empty.
75 type Message struct {
76 Command string
77 Headers Headers
78 Body []uint8
82 Frame is an alternate name for a Message.
84 type Frame Message
87 MessageData passed to the client, containing: the Message; and an Error
88 value which is possibly nil.
90 Note that this has no relevance on whether a MessageData.Message.Command
91 value contains an "ERROR" generated by the broker.
93 type MessageData struct {
94 Message Message
95 Error error
99 This is outbound on the wire.
101 type wiredata struct {
102 frame Frame
103 errchan chan error
107 Stomper is an interface that models STOMP specification commands.
109 type Stomper interface {
110 Abort(h Headers) error
111 Ack(headers Headers) error
112 Begin(h Headers) error
113 Commit(h Headers) error
114 Disconnect(headers Headers) error
115 Nack(headers Headers) error
116 Send(Headers, string) error
117 Subscribe(headers Headers) (<-chan MessageData, error)
118 Unsubscribe(headers Headers) error
120 SendBytes(h Headers, b []byte) error
124 StatsReader is an interface that modela a reader for the statistics
125 maintained by the stompngo package.
127 type StatsReader interface {
128 FramesRead() int64
129 BytesRead() int64
130 FramesWritten() int64
131 BytesWritten() int64
135 HBDataReader is an interface that modela a reader for the heart beat
136 data maintained by the stompngo package.
138 type HBDataReader interface {
139 SendTickerInterval() int64
140 ReceiveTickerInterval() int64
141 SendTickerCount() int64
142 ReceiveTickerCount() int64
146 Deadliner is an interface that models the optional network deadline
147 functionality implemented by the stompngo package.
149 type Deadliner interface {
150 WriteDeadline(d time.Duration)
151 EnableWriteDeadline(e bool)
152 ExpiredNotification(enf ExpiredNotification)
153 IsWriteDeadlineEnabled() bool
154 ReadDeadline(d time.Duration)
155 EnableReadDeadline(e bool)
156 IsReadDeadlineEnabled() bool
157 ShortWriteRecovery(ro bool)
161 Monitor is an interface that models monitoring a stompngo connection.
163 type Monitor interface {
164 Connected() bool
165 Session() string
166 Protocol() string
167 Running() time.Duration
168 SubChanCap() int
172 ParmHandler is an interface that models stompngo client parameter
173 specification.
175 type ParmHandler interface {
176 SetLogger(l *log.Logger)
177 GetLogger() *log.Logger
178 SetSubChanCap(nc int)
182 STOMPConnector is an interface that encapsulates the Connection struct.
184 type STOMPConnector interface {
185 Stomper
186 StatsReader
187 HBDataReader
188 Deadliner
189 Monitor
190 ParmHandler
195 Connection is a representation of a STOMP connection.
197 type Connection struct {
198 ConnectResponse *Message // Broker response (CONNECTED/ERROR) if physical connection successful.
199 DisconnectReceipt MessageData // If receipt requested on DISCONNECT.
200 MessageData <-chan MessageData // Inbound data for the client.
201 connected bool
202 connLock sync.Mutex // connected variable lock
203 session string
204 sessLock sync.Mutex // session variable lock
205 protocol string
206 protoLock sync.Mutex // protocol variable lock
207 input chan MessageData
208 output chan wiredata
209 netconn net.Conn
210 subs map[string]*subscription
211 subsLock sync.RWMutex
212 ssdc chan struct{} // System shutdown channel
213 abortOnce sync.Once // Ensure close ssdc once
214 wtrsdc chan struct{} // Special writer shutdown channel
215 hbd *heartBeatData
216 wtr *bufio.Writer
217 rdr *bufio.Reader
218 Hbrf bool // Indicates a heart beat read/receive failure, which is possibly transient. Valid for 1.1+ only.
219 Hbsf bool // Indicates a heart beat send failure, which is possibly transient. Valid for 1.1+ only.
220 logger *log.Logger
221 mets *metrics // Client metrics
222 scc int // Subscribe channel capacity
223 discLock sync.Mutex // DISCONNECT lock
224 dld *deadlineData // Deadline data
227 type subscription struct {
228 md chan MessageData // Subscription specific MessageData channel
229 id string // Subscription id (unique, self reference)
230 am string // ACK mode for this subscription
231 cs bool // Closed during shutdown
232 drav bool // Drain After value validity
233 dra uint // Start draining after # messages (MESSAGE frames)
234 drmc uint // Current drain count if draining
238 Error definition.
240 type Error string
243 Error constants.
245 const (
246 // ERROR Frame returned by broker on connect.
247 ECONERR = Error("broker returned ERROR frame, CONNECT")
249 // ERRORs for Headers.
250 EHDRLEN = Error("unmatched headers, bad length")
251 EHDRUTF8 = Error("header string not UTF8")
252 EHDRNIL = Error("headers can not be nil")
253 EUNKHDR = Error("corrupt frame headers")
254 EHDRMTK = Error("header key can not be empty")
255 EHDRMTV = Error("header value can not be empty")
257 // ERRORs for response to CONNECT.
258 EUNKFRM = Error("unrecognized frame returned, CONNECT")
259 EBADFRM = Error("Malformed frame")
260 EBADSSLP = Error("Got HandShake data, wrong SSL port?")
262 // No body allowed error
263 EBDYDATA = Error("body data not allowed")
265 // Not connected.
266 ECONBAD = Error("no current connection or DISCONNECT previously completed")
268 // Destination required
269 EREQDSTSND = Error("destination required, SEND")
270 EREQDSTSUB = Error("destination required, SUBSCRIBE")
271 EREQDIUNS = Error("destination required, UNSUBSCRIBE")
272 EREQDSTUNS = Error("destination required, UNSUBSCRIBE") // Alternate name
274 // id required
275 EREQIDUNS = Error("id required, UNSUBSCRIBE")
277 // Message ID required.
278 EREQMIDACK = Error("message-id required, ACK") // 1.0, 1.1
279 EREQIDACK = Error("id required, ACK") // 1.2
281 // Subscription required.
282 EREQSUBACK = Error("subscription required, ACK") // 1.1
284 // NACK's. STOMP 1.1 or greater.
285 EREQMIDNAK = Error("message-id required, NACK") // 1.1
286 EREQSUBNAK = Error("subscription required, NACK") // 1.1
287 EREQIDNAK = Error("id required, NACK") // 1.2
289 // Transaction ID required.
290 EREQTIDBEG = Error("transaction-id required, BEGIN")
291 EREQTIDCOM = Error("transaction-id required, COMMIT")
292 EREQTIDABT = Error("transaction-id required, ABORT")
294 // Transaction ID present but empty.
295 ETIDBEGEMT = Error("transaction-id empty, BEGIN")
296 ETIDCOMEMT = Error("transaction-id empty, COMMIT")
297 ETIDABTEMT = Error("transaction-id empty, ABORT")
299 // Host header required, STOMP 1.1+
300 EREQHOST = Error("host header required for STOMP 1.1+")
302 // Subscription errors.
303 EDUPSID = Error("duplicate subscription-id")
304 EBADSID = Error("invalid subscription-id")
306 // Subscribe errors.
307 ESBADAM = Error("invalid ackmode, SUBSCRIBE")
309 // Unsubscribe error.
310 EUNOSID = Error("id required, UNSUBSCRIBE")
311 EUNODSID = Error("destination or id required, UNSUBSCRIBE") // 1.0
313 // Unsupported version error.
314 EBADVERCLI = Error("unsupported protocol version, client")
315 EBADVERSVR = Error("unsupported protocol version, server")
316 EBADVERNAK = Error("unsupported protocol version, NACK")
318 // Unsupported Headers type.
319 EBADHDR = Error("unsupported Headers type")
321 // Receipt not allowed on connect
322 ENORECPT = Error("receipt not allowed on CONNECT")
324 // Invalid broker command
325 EINVBCMD = Error("invalid broker command")
327 // Invalid receipt-id string
328 EBADRID = Error("invalid receipt-id")
330 // DISCONNECT timeout
331 EDISCTO = Error("DISCONNECT timeout")
335 A zero length buffer for convenience.
337 var NULLBUFF = make([]uint8, 0)
340 A no disconnect receipt Headers value for convenience.
342 var NoDiscReceipt = Headers{"noreceipt", "true"}
345 Codec data structure definition.
347 type codecdata struct {
348 encoded string
349 decoded string
353 STOMP specification defined encoded / decoded values for the Message
354 command and headers.
356 var codecValues = []codecdata{
357 codecdata{"\\\\", "\\"},
358 codecdata{"\\" + "n", "\n"},
359 codecdata{"\\" + "r", "\r"},
360 codecdata{"\\c", ":"},
364 Control data for initialization of heartbeats with STOMP 1.1+, and the
365 subsequent control of any heartbeat routines.
367 type heartBeatData struct {
368 sdl sync.Mutex // Send data lock
369 rdl sync.Mutex // Receive data lock
370 clk sync.Mutex // Shutdown lock
371 ssdn bool // Shutdown complete
373 cx int64 // client send value, ms
374 cy int64 // client receive value, ms
375 sx int64 // server send value, ms
376 sy int64 // server receive value, ms
378 hbs bool // sending heartbeats
379 hbr bool // receiving heartbeats
381 sti int64 // local sender ticker interval, ns
382 rti int64 // local receiver ticker interval, ns
384 sc int64 // local sender ticker count
385 rc int64 // local receiver ticker count
387 ssd chan struct{} // sender shutdown channel
388 rsd chan struct{} // receiver shutdown channel
390 ls int64 // last send time, ns
391 lr int64 // last receive time, ns
395 Control structure for basic client metrics.
397 type metrics struct {
398 st time.Time // Start Time
399 tfr int64 // Total frame reads
400 tbr int64 // Total bytes read
401 tfw int64 // Total frame writes
402 tbw int64 // Total bytes written
406 Valid broker commands.
408 var validCmds = map[string]bool{MESSAGE: true, ERROR: true, RECEIPT: true}
410 var logLock sync.Mutex
412 const (
413 NetProtoTCP = "tcp" // Protocol Name
416 var HandShake = []byte{0x15, 0x03, 0x03, 0x00}
419 Common Header keys
421 const (
422 HK_ACCEPT_VERSION = "accept-version"
423 HK_ACK = "ack"
424 HK_CONTENT_TYPE = "content-type"
425 HK_CONTENT_LENGTH = "content-length"
426 HK_DESTINATION = "destination"
427 HK_HEART_BEAT = "heart-beat"
428 HK_HOST = "host" // HK_VHOST aloas
429 HK_ID = "id"
430 HK_LOGIN = "login"
431 HK_MESSAGE = "message"
432 HK_MESSAGE_ID = "message-id"
433 HK_SUPPRESS_CL = "suppress-content-length" // Not in any spec, but used
434 HK_SUPPRESS_CT = "suppress-content-type" // Not in any spec, but used
435 HK_PASSCODE = "passcode"
436 HK_RECEIPT = "receipt"
437 HK_RECEIPT_ID = "receipt-id"
438 HK_SESSION = "session"
439 HK_SERVER = "server"
440 HK_SUBSCRIPTION = "subscription"
441 HK_TRANSACTION = "transaction"
442 HK_VERSION = "version"
443 HK_VHOST = "host" // HK_HOST alias
447 ACK Modes
449 const (
450 AckModeAuto = "auto"
451 AckModeClient = "client"
452 AckModeClientIndividual = "client-individual"
455 var (
456 validAckModes10 = map[string]bool{AckModeAuto: true,
457 AckModeClient: true}
458 validAckModes1x = map[string]bool{AckModeClientIndividual: true}
462 Default content-type.
464 const (
465 DFLT_CONTENT_TYPE = "text/plain; charset=UTF-8"
469 Extensions to STOMP protocol.
471 const (
472 StompPlusDrainAfter = "sng_drafter" // SUBSCRIBE Header
473 StompPlusDrainNow = "sng_drnow" // UNSUBSCRIBE Header
476 var (
477 LFB = []byte("\n")
478 ZRB = []byte{0}