-
Notifications
You must be signed in to change notification settings - Fork 641
/
Copy pathmessage.go
122 lines (99 loc) · 2.09 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package mqtt
import (
"io"
)
type Message struct {
b []byte
}
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
const (
CONNECT = 0x10
CONNACK = 0x20
PUBLISH = 0x30
PUBACK = 0x40
SUBSCRIBE = 0x82
SUBACK = 0x90
QOS1 = 0x02
)
func (m *Message) WriteByte(b byte) {
m.b = append(m.b, b)
}
func (m *Message) WriteBytes(b []byte) {
m.b = append(m.b, b...)
}
func (m *Message) WriteUint16(i uint16) {
m.b = append(m.b, byte(i>>8), byte(i))
}
func (m *Message) WriteLen(i int) {
for i > 0 {
b := byte(i % 128)
if i /= 128; i > 0 {
b |= 0x80
}
m.WriteByte(b)
}
}
func (m *Message) WriteString(s string) {
m.WriteUint16(uint16(len(s)))
m.b = append(m.b, s...)
}
func (m *Message) Bytes() []byte {
return m.b
}
const (
flagCleanStart = 0x02
flagUsername = 0x80
flagPassword = 0x40
)
func NewConnect(clientID, username, password string) *Message {
m := &Message{}
m.WriteByte(CONNECT)
m.WriteLen(16 + len(clientID) + len(username) + len(password))
m.WriteString("MQTT")
m.WriteByte(4) // MQTT version
m.WriteByte(flagCleanStart | flagUsername | flagPassword)
m.WriteUint16(30) // keepalive
m.WriteString(clientID)
m.WriteString(username)
m.WriteString(password)
return m
}
func NewSubscribe(mid uint16, topic string, qos byte) *Message {
m := &Message{}
m.WriteByte(SUBSCRIBE)
m.WriteLen(5 + len(topic))
m.WriteUint16(mid)
m.WriteString(topic)
m.WriteByte(qos)
return m
}
func NewPublish(topic string, payload []byte) *Message {
m := &Message{}
m.WriteByte(PUBLISH)
m.WriteLen(2 + len(topic) + len(payload))
m.WriteString(topic)
m.WriteBytes(payload)
return m
}
func NewPublishQOS1(mid uint16, topic string, payload []byte) *Message {
m := &Message{}
m.WriteByte(PUBLISH | QOS1)
m.WriteLen(4 + len(topic) + len(payload))
m.WriteString(topic)
m.WriteUint16(mid)
m.WriteBytes(payload)
return m
}
func ReadLen(r io.Reader) (uint32, error) {
var i uint32
var shift byte
b := []byte{0x80}
for b[0]&0x80 != 0 {
if _, err := r.Read(b); err != nil {
return 0, err
}
i += uint32(b[0]&0x7F) << shift
shift += 7
}
return i, nil
}