-
Notifications
You must be signed in to change notification settings - Fork 0
/
read.go
99 lines (87 loc) · 2.2 KB
/
read.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package stompy
import (
"bufio"
"net"
"strings"
)
type stompReader interface {
ReadBytes(c byte) ([]byte, error)
ReadString(c byte) (string, error)
}
type stompSocketReader struct {
decoder decoder
reader stompReader
shutdown chan bool
errChan chan error
msgChan chan Frame // we may be over complicating here
}
func newStompReader(con net.Conn, shutdownCh chan bool, errChan chan error, msgChan chan Frame, decoder decoder) stompSocketReader {
return stompSocketReader{
decoder: decoder,
reader: bufio.NewReader(con),
shutdown: shutdownCh,
errChan: errChan,
msgChan: msgChan,
}
}
//reads a single frame of the wire
func (sr stompSocketReader) readFrame() (Frame, error) {
f := Frame{}
line, err := sr.reader.ReadBytes('\n')
//count this as a connection error. will be sent via error channel to the reconnect handler
if err != nil {
return f, ConnectionError(err.Error())
}
f.Command = line
//sort out our headers
f.Headers = StompHeaders{}
for {
header, err := sr.reader.ReadString('\n')
if nil != err {
return f, err
}
if header == "\n" {
//reached end of headers break should we set some short deadlock break?
break
}
parsed := strings.SplitN(header[0:len(header)-1], ":", 2)
if len(parsed) != 2 {
return f, BadFrameError("failed to parse header correctly " + header)
}
key := sr.decoder.Decode(parsed[0])
val := sr.decoder.Decode(parsed[1])
f.Headers[key] = val
}
//read body
body, err := sr.reader.ReadBytes('\n')
if err != nil {
return f, err
}
if 0 != len(body) {
//return all but last 2 bytes which are a nul byte and a \n
f.Body = body[0 : len(body)-2]
}
return f, nil
}
func (sr stompSocketReader) startReadLoop() {
//read a frame, if it is a subscription send it be handled
for {
frame, err := sr.readFrame() //this will block until it reads
//if we have clean shutdown ignore the error as the connection has been closed
select {
case <-sr.shutdown:
return
default:
if err != nil {
if _, ok := err.(BadFrameError); ok {
sr.errChan <- err
} else {
sr.errChan <- ConnectionError("failed when reading frame " + err.Error())
sr.shutdown <- true
}
} else {
sr.msgChan <- frame
}
}
}
}