Skip to content

Commit

Permalink
Ignore retain as published v3 (#142)
Browse files Browse the repository at this point in the history
* Optimise Capabilities struct alignment

* Only use RetainAsPublished for v5 clients
  • Loading branch information
mochi-co authored Jan 13, 2023
1 parent 0de1d73 commit 3001524
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 25 deletions.
22 changes: 6 additions & 16 deletions packets/tpackets.go
Original file line number Diff line number Diff line change
Expand Up @@ -1824,32 +1824,22 @@ var TPacketData = map[byte]TPacketCases{
Case: TPublishRetainMqtt5,
Desc: "retain mqtt5",
RawBytes: []byte{
Publish<<4 | 1<<0, 35, // Fixed header
Publish<<4 | 1<<0, 19, // Fixed header
0, 5, // Topic Name - LSB+MSB
'a', '/', 'b', '/', 'c', // Topic Name
16, // properties length
38, // User Properties (38)
0, 5, 'h', 'e', 'l', 'l', 'o',
0, 6, 228, 184, 150, 231, 149, 140,
0, // properties length
'h', 'e', 'l', 'l', 'o', ' ', 'm', 'o', 'c', 'h', 'i', // Payload
},
Packet: &Packet{
ProtocolVersion: 5,
FixedHeader: FixedHeader{
Type: Publish,
Retain: true,
Remaining: 35,
Remaining: 19,
},
TopicName: "a/b/c",
Properties: Properties{
User: []UserProperty{
{
Key: "hello",
Val: "世界",
},
},
},
Payload: []byte("hello mochi"),
TopicName: "a/b/c",
Properties: Properties{},
Payload: []byte("hello mochi"),
},
},
{
Expand Down
8 changes: 4 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

const (
Version = "2.1.4" // the current server version.
Version = "2.1.5" // the current server version.
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
defaultFanPoolSize uint64 = 32 // the number of concurrent workers in the pool
defaultFanPoolQueueSize uint64 = 1024 // the capacity of each worker queue
Expand Down Expand Up @@ -61,13 +61,13 @@ type Capabilities struct {
ReceiveMaximum uint16
TopicAliasMaximum uint16
ServerKeepAlive uint16
SharedSubAvailable byte
MinimumProtocolVersion byte
Compatibilities Compatibilities
MaximumQos byte
RetainAvailable byte
WildcardSubAvailable byte
SubIDAvailable byte
SharedSubAvailable byte
MinimumProtocolVersion byte
}

// Compatibilities provides flags for using compatibility modes.
Expand Down Expand Up @@ -785,7 +785,7 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
}

out := pk.Copy(false)
if !sub.RetainAsPublished { // ![MQTT-3.3.1-13]
if cl.Properties.ProtocolVersion == 5 && !sub.RetainAsPublished { // ![MQTT-3.3.1-13]
out.FixedHeader.Retain = false // [MQTT-3.3.1-12]
}

Expand Down
10 changes: 5 additions & 5 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ func TestServerProcessPacketPublishAndReceive(t *testing.T) {
w2.Close()
}()

require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, <-receiverBuf)
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes, <-receiverBuf)
require.Equal(t, 1, len(s.Topics.Messages("a/b/c")))
}

Expand Down Expand Up @@ -1537,7 +1537,7 @@ func TestPublishRetainedToClient(t *testing.T) {
subbed := s.Topics.Subscribe(cl.ID, packets.Subscription{Filter: "a/b/c", Qos: 2})
require.True(t, subbed)

retained := s.Topics.RetainMessage(*packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).Packet)
retained := s.Topics.RetainMessage(*packets.TPacketData[packets.Publish].Get(packets.TPublishRetainMqtt5).Packet)
require.Equal(t, int64(1), retained)

go func() {
Expand All @@ -1548,7 +1548,7 @@ func TestPublishRetainedToClient(t *testing.T) {

buf, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, buf)
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes, buf)
}

func TestPublishRetainedToClientIsShared(t *testing.T) {
Expand Down Expand Up @@ -2064,7 +2064,7 @@ func TestServerProcessSubscribeWithRetain(t *testing.T) {
require.NoError(t, err)
require.Equal(t, append(
packets.TPacketData[packets.Suback].Get(packets.TSuback).RawBytes,
packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes...,
packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes...,
), buf)
}

Expand Down Expand Up @@ -2452,7 +2452,7 @@ func TestServerSendLWTDelayed(t *testing.T) {
recv <- buf
}()

require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, <-recv)
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes, <-recv)
}

func TestServerReadStore(t *testing.T) {
Expand Down

0 comments on commit 3001524

Please sign in to comment.