Rework writer deadline handling.
[stompngo.git] / writer.go
blobdec668678db12fb15303c922d5a7399f3b192d85
1 //
2 // Copyright © 2011-2017 Guy M. Allard
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 package stompngo
19 import (
20 "bufio"
21 "bytes"
22 "net"
23 // "bytes"
24 "strconv"
25 "time"
29 Logical network writer. Read wiredata structures from the communication
30 channel, and put the frame on the wire.
32 func (c *Connection) writer() {
33 writerLoop:
34 for {
35 select {
36 case d := <-c.output:
37 c.log("WTR_WIREWRITE start")
38 c.wireWrite(d)
39 c.log("WTR_WIREWRITE COMPLETE", d.frame.Command, d.frame.Headers,
40 HexData(d.frame.Body))
41 if d.frame.Command == DISCONNECT {
42 break writerLoop // we are done with this connection
44 case _ = <-c.ssdc:
45 c.log("WTR_WIREWRITE shutdown S received")
46 break writerLoop
47 case _ = <-c.wtrsdc:
48 c.log("WTR_WIREWRITE shutdown W received")
49 break writerLoop
51 } // of for
53 c.log("WTR_SHUTDOWN", time.Now())
57 Connection logical write.
59 func (c *Connection) wireWrite(d wiredata) {
60 f := &d.frame
61 // fmt.Printf("WWD01 f:[%v]\n", f)
62 switch f.Command {
63 case "\n": // HeartBeat frame
64 if c.dld.wde && c.dld.dns {
65 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
67 _, e := c.wtr.WriteString(f.Command)
68 if e != nil {
69 if e.(net.Error).Timeout() {
70 if c.dld.dns {
71 c.dld.dlnotify(e, true)
74 d.errchan <- e
75 return
77 default: // Other frames
78 if e := f.writeFrame(c.wtr, c); e != nil {
79 d.errchan <- e
80 return
82 if e := c.wtr.Flush(); e != nil {
83 d.errchan <- e
84 return
87 if e := c.wtr.Flush(); e != nil {
88 d.errchan <- e
89 return
92 if c.hbd != nil {
93 c.hbd.sdl.Lock()
94 c.hbd.ls = time.Now().UnixNano() // Latest good send
95 c.hbd.sdl.Unlock()
97 c.mets.tfw++ // Frame written count
98 c.mets.tbw += f.Size(false) // Bytes written count
100 d.errchan <- nil
101 return
105 Physical frame write to the wire.
107 func (f *Frame) writeFrame(w *bufio.Writer, c *Connection) error {
109 var sctok bool
110 // Content type. Always add it if the client does not suppress and does not
111 // supply it.
112 _, sctok = f.Headers.Contains(HK_SUPPRESS_CT)
113 if !sctok {
114 if _, ctok := f.Headers.Contains(HK_CONTENT_TYPE); !ctok {
115 f.Headers = append(f.Headers, HK_CONTENT_TYPE,
116 DFLT_CONTENT_TYPE)
120 var sclok bool
121 // Content length - Always add it if client does not suppress it and
122 // does not supply it.
123 _, sclok = f.Headers.Contains(HK_SUPPRESS_CL)
124 if !sclok {
125 if _, clok := f.Headers.Contains(HK_CONTENT_LENGTH); !clok {
126 f.Headers = append(f.Headers, HK_CONTENT_LENGTH, strconv.Itoa(len(f.Body)))
129 // Encode the headers if needed
130 if c.Protocol() > SPL_10 && f.Command != CONNECT {
131 for i := 0; i < len(f.Headers); i += 2 {
132 f.Headers[i] = encode(f.Headers[i])
133 f.Headers[i+1] = encode(f.Headers[i+1])
137 if sclok {
138 nz := bytes.IndexByte(f.Body, 0)
139 // fmt.Printf("WDBG41 ok:%v\n", nz)
140 if nz == 0 {
141 f.Body = []byte{}
142 // fmt.Printf("WDBG42 body:%v bodystring: %v\n", f.Body, string(f.Body))
143 } else if nz > 0 {
144 f.Body = f.Body[0:nz]
145 // fmt.Printf("WDBG43 body:%v bodystring: %v\n", f.Body, string(f.Body))
149 if c.dld.wde && c.dld.dns {
150 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
153 // Writes start
155 // Write the frame Command
156 _, e := w.WriteString(f.Command + "\n")
157 if c.checkWriteError(e) != nil {
158 return e
160 // fmt.Println("WRCMD", f.Command)
161 // Write the frame Headers
162 for i := 0; i < len(f.Headers); i += 2 {
163 if c.dld.wde && c.dld.dns {
164 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
166 _, e := w.WriteString(f.Headers[i] + ":" + f.Headers[i+1] + "\n")
167 if c.checkWriteError(e) != nil {
168 return e
170 // fmt.Println("WRHDR", f.Headers[i]+":"+f.Headers[i+1]+"\n")
173 // Write the last Header LF
174 if c.dld.wde && c.dld.dns {
175 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
177 e = w.WriteByte('\n')
178 if c.checkWriteError(e) != nil {
179 return e
181 // fmt.Printf("WDBG40 ok:%v\n", sclok)
183 // Write the body
184 if len(f.Body) != 0 { // Foolish to write 0 length data
185 // fmt.Println("WRBDY", f.Body)
186 e := c.writeBody(f)
187 if c.checkWriteError(e) != nil {
188 return e
191 if c.dld.wde && c.dld.dns {
192 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
194 e = w.WriteByte(0)
195 if c.checkWriteError(e) != nil {
196 return e
198 return nil
201 func (c *Connection) checkWriteError(e error) error {
202 if e == nil {
203 return e
205 ne, ok := e.(net.Error)
206 if !ok {
207 return e
209 if ne.Timeout() {
210 if c.dld.dns {
211 c.log("invoking write deadline callback 1")
212 c.dld.dlnotify(e, true)
215 return e
218 func (c *Connection) writeBody(f *Frame) error {
219 // fmt.Printf("WDBG99 body:%v bodystring: %v\n", f.Body, string(f.Body))
220 var n = 0
221 var e error
222 for {
223 if c.dld.wde && c.dld.dns {
224 _ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
226 n, e = c.wtr.Write(f.Body)
227 if n == len(f.Body) {
228 return e
230 c.log("SHORT WRITE", n, len(f.Body))
231 if c.dld.wde && c.dld.dns && isErrorTimeout(e) {
232 c.log("invoking write deadline callback 2")
233 c.dld.dlnotify(e, true)
235 f.Body = f.Body[n:]
239 func isErrorTimeout(e error) bool {
240 if e == nil {
241 return false
243 _, ok := e.(net.Error)
244 if !ok {
245 return false
247 return true