-
Notifications
You must be signed in to change notification settings - Fork 641
/
Copy pathclient.go
112 lines (88 loc) · 2 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package mqtt
import (
"bytes"
"encoding/binary"
"errors"
"io"
"net"
"time"
)
const Timeout = time.Second * 5
type Client struct {
conn net.Conn
mid uint16
}
func NewClient(conn net.Conn) *Client {
return &Client{conn: conn, mid: 2}
}
func (c *Client) Connect(clientID, username, password string) (err error) {
if err = c.conn.SetDeadline(time.Now().Add(Timeout)); err != nil {
return
}
msg := NewConnect(clientID, username, password)
if _, err = c.conn.Write(msg.b); err != nil {
return
}
b := make([]byte, 4)
if _, err = io.ReadFull(c.conn, b); err != nil {
return
}
if !bytes.Equal(b, []byte{CONNACK, 2, 0, 0}) {
return errors.New("wrong login")
}
return
}
func (c *Client) Subscribe(topic string) (err error) {
if err = c.conn.SetDeadline(time.Now().Add(Timeout)); err != nil {
return
}
c.mid++
msg := NewSubscribe(c.mid, topic, 1)
_, err = c.conn.Write(msg.b)
return
}
func (c *Client) Publish(topic string, payload []byte) (err error) {
if err = c.conn.SetDeadline(time.Now().Add(Timeout)); err != nil {
return
}
c.mid++
msg := NewPublishQOS1(c.mid, topic, payload)
_, err = c.conn.Write(msg.b)
return
}
func (c *Client) Read() (string, []byte, error) {
if err := c.conn.SetDeadline(time.Now().Add(Timeout)); err != nil {
return "", nil, err
}
b := make([]byte, 1)
if _, err := io.ReadFull(c.conn, b); err != nil {
return "", nil, err
}
size, err := ReadLen(c.conn)
if err != nil {
return "", nil, err
}
b0 := b[0]
b = make([]byte, size)
if _, err = io.ReadFull(c.conn, b); err != nil {
return "", nil, err
}
if b0&0xF0 != PUBLISH {
return "", nil, nil
}
i := binary.BigEndian.Uint16(b)
if uint32(i) > size {
return "", nil, errors.New("wrong topic size")
}
b = b[2:]
if qos := (b0 >> 1) & 0b11; qos == 0 {
return string(b[:i]), b[i:], nil
}
// response with packet ID
_, _ = c.conn.Write([]byte{PUBACK, 2, b[i], b[i+1]})
return string(b[2:i]), b[i+2:], nil
}
func (c *Client) Close() error {
// TODO: Teardown
return c.conn.Close()
}