Address races for conn.connected.
[stompngo.git] / writer.go
blobe24c0e3e8015a9c4c4fc5c28952f830a8073f610
1 //
2 // Copyright © 2011-2019 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "bufio"
21 "bytes"
22 "net"
24 // "bytes"
25 "strconv"
26 "time"
30 Write data to logical network writer. Writer will take care of the output wire data.
31 If the underlying connection goes bad and writer give up working, the closed ssdc chan
32 will make sure write action aware that happens.
34 func (c *Connection) writeWireData(wd wiredata) error {
35 select {
36 case c.output <- wd:
37 case <-c.ssdc:
38 return ECONBAD
40 return nil
44 Logical network writer. Read wiredata structures from the communication
45 channel, and put the frame on the wire.
47 func (c *Connection) writer() {
48 writerLoop:
49 for {
50 select {
51 case d := <-c.output:
52 c.log("WTR_WIREWRITE start")
53 c.wireWrite(d)
54 logLock.Lock()
55 if c.logger != nil {
56 c.logx("WTR_WIREWRITE COMPLETE", d.frame.Command, d.frame.Headers,
57 HexData(d.frame.Body))
59 logLock.Unlock()
60 if d.frame.Command == DISCONNECT {
61 break writerLoop // we are done with this connection
63 case _ = <-c.ssdc:
64 c.log("WTR_WIREWRITE shutdown S received")
65 break writerLoop
66 case _ = <-c.wtrsdc:
67 c.log("WTR_WIREWRITE shutdown W received")
68 break writerLoop
70 } // of for
72 c.setConnected(false)
73 c.sysAbort()
74 c.log("WTR_SHUTDOWN", time.Now())
78 Connection logical write.
80 func (c *Connection) wireWrite(d wiredata) {
81 f := &d.frame
82 // fmt.Printf("WWD01 f:[%v]\n", f)
83 switch f.Command {
84 case "\n": // HeartBeat frame
85 if c.dld.wde && c.dld.wds {
86 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
88 _, e := c.wtr.WriteString(f.Command)
89 if e != nil {
90 if e.(net.Error).Timeout() {
91 if c.dld.dns {
92 c.dld.dlnotify(e, true)
95 d.errchan <- e
96 return
98 default: // Other frames
99 if e := f.writeFrame(c.wtr, c); e != nil {
100 d.errchan <- e
101 return
103 if e := c.wtr.Flush(); e != nil {
104 d.errchan <- e
105 return
108 if e := c.wtr.Flush(); e != nil {
109 d.errchan <- e
110 return
113 if c.hbd != nil {
114 c.hbd.sdl.Lock()
115 c.hbd.ls = time.Now().UnixNano() // Latest good send
116 c.hbd.sdl.Unlock()
118 c.mets.tfw++ // Frame written count
119 c.mets.tbw += f.Size(false) // Bytes written count
121 d.errchan <- nil
122 return
126 Physical frame write to the wire.
128 func (f *Frame) writeFrame(w *bufio.Writer, c *Connection) error {
130 var sctok bool
131 // Content type. Always add it if the client does not suppress and does not
132 // supply it.
133 _, sctok = f.Headers.Contains(HK_SUPPRESS_CT)
134 if !sctok {
135 if _, ctok := f.Headers.Contains(HK_CONTENT_TYPE); !ctok {
136 f.Headers = append(f.Headers, HK_CONTENT_TYPE,
137 DFLT_CONTENT_TYPE)
141 var sclok bool
142 // Content length - Always add it if client does not suppress it and
143 // does not supply it.
144 _, sclok = f.Headers.Contains(HK_SUPPRESS_CL)
145 if !sclok {
146 if _, clok := f.Headers.Contains(HK_CONTENT_LENGTH); !clok {
147 f.Headers = append(f.Headers, HK_CONTENT_LENGTH, strconv.Itoa(len(f.Body)))
150 // Encode the headers if needed
151 if c.Protocol() > SPL_10 && f.Command != CONNECT {
152 for i := 0; i < len(f.Headers); i += 2 {
153 f.Headers[i] = encode(f.Headers[i])
154 f.Headers[i+1] = encode(f.Headers[i+1])
158 if sclok {
159 nz := bytes.IndexByte(f.Body, 0)
160 // fmt.Printf("WDBG41 ok:%v\n", nz)
161 if nz == 0 {
162 f.Body = []byte{}
163 // fmt.Printf("WDBG42 body:%v bodystring: %v\n", f.Body, string(f.Body))
164 } else if nz > 0 {
165 f.Body = f.Body[0:nz]
166 // fmt.Printf("WDBG43 body:%v bodystring: %v\n", f.Body, string(f.Body))
170 if c.dld.wde && c.dld.wds {
171 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
174 // Writes start
176 // Write the frame Command
177 _, e := w.WriteString(f.Command + "\n")
178 if c.checkWriteError(e) != nil {
179 return e
181 // fmt.Println("WRCMD", f.Command)
182 // Write the frame Headers
183 for i := 0; i < len(f.Headers); i += 2 {
184 if c.dld.wde && c.dld.wds {
185 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
187 _, e := w.WriteString(f.Headers[i] + ":" + f.Headers[i+1] + "\n")
188 if c.checkWriteError(e) != nil {
189 return e
191 // fmt.Println("WRHDR", f.Headers[i]+":"+f.Headers[i+1]+"\n")
194 // Write the last Header LF
195 if c.dld.wde && c.dld.wds {
196 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
198 e = w.WriteByte('\n')
199 if c.checkWriteError(e) != nil {
200 return e
202 // fmt.Printf("WDBG40 ok:%v\n", sclok)
204 // Write the body
205 if len(f.Body) != 0 { // Foolish to write 0 length data
206 // fmt.Println("WRBDY", f.Body)
207 e := c.writeBody(f)
208 if c.checkWriteError(e) != nil {
209 return e
212 if c.dld.wde && c.dld.wds {
213 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
215 e = w.WriteByte(0)
216 if c.checkWriteError(e) != nil {
217 return e
219 // End of write loop - set no deadline
220 if c.dld.wde {
221 _ = c.netconn.SetWriteDeadline(c.dld.t0)
223 return nil
226 func (c *Connection) checkWriteError(e error) error {
227 if e == nil {
228 return e
230 ne, ok := e.(net.Error)
231 if !ok {
232 return e
234 if ne.Timeout() {
235 if c.dld.dns {
236 c.log("invoking write deadline callback 1")
237 c.dld.dlnotify(e, true)
240 return e
243 func (c *Connection) writeBody(f *Frame) error {
244 // fmt.Printf("WDBG99 body:%v bodystring: %v\n", f.Body, string(f.Body))
245 var n = 0
246 var e error
247 for {
248 if c.dld.wde && c.dld.wds {
249 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
251 n, e = c.wtr.Write(f.Body)
252 if n == len(f.Body) {
253 return e
255 c.log("SHORT WRITE", n, len(f.Body))
256 if n == 0 { // Zero bytes would mean something is seriously wrong.
257 return e
259 if !c.dld.rfsw {
260 return e
262 if c.dld.wde && c.dld.wds && c.dld.dns && isErrorTimeout(e) {
263 c.log("invoking write deadline callback 2")
264 c.dld.dlnotify(e, true)
266 // *Any* error from a bufio.Writer is *not* recoverable. See code in
267 // bufio.go to understand this. We get a new writer here, to clear any
268 // error condition.
269 c.wtr = bufio.NewWriter(c.netconn) // Create new writer
270 f.Body = f.Body[n:]
274 func isErrorTimeout(e error) bool {
275 if e == nil {
276 return false
278 _, ok := e.(net.Error)
279 if !ok {
280 return false
282 return true